1use std::{
8 collections::HashMap,
9 sync::{Arc, Mutex},
10 time::{Duration, Instant},
11};
12
13use tracing::info;
14
15use crate::{
16 app::{PhaseTimer, UserError},
17 core::{
18 ConsensusPlugin, Conversation, ConversationConfig, ConversationHandle,
19 ConversationPluginsFactory, ConversationState, ConversationStateMachine, PluginConsensus,
20 SessionEvent,
21 },
22 ds::{OutboundPacket, SharedDeliveryService},
23};
24
25pub(crate) fn send_packet(
30 transport: &SharedDeliveryService,
31 packet: OutboundPacket,
32) -> Result<(), UserError> {
33 transport
34 .lock()
35 .map_err(|_| UserError::LockPoisoned("transport"))?
36 .publish(packet)?;
37 Ok(())
38}
39
40#[derive(Debug, Clone, Copy)]
47pub(crate) struct AutoVoteEntry {
48 pub(crate) fire_at: Instant,
49 pub(crate) vote: bool,
50}
51
52pub struct SessionRunner<P: ConsensusPlugin, CP: ConversationPluginsFactory> {
53 pub(crate) conversation_name: String,
56 pub(crate) handle: ConversationHandle<CP>,
57 pub consensus: PluginConsensus<P>,
65 phase_timer: PhaseTimer,
68 pub(crate) pending_auto_votes: HashMap<u32, AutoVoteEntry>,
73 pub(crate) pending_consensus_timeouts: HashMap<u32, Instant>,
79 transport: SharedDeliveryService,
83 pub(crate) self_identity: Arc<[u8]>,
87 pub(crate) identity_display: Arc<str>,
91 pub(crate) app_id: Arc<[u8]>,
94 pending_events: Mutex<Vec<SessionEvent>>,
98}
99
100impl<P: ConsensusPlugin, CP: ConversationPluginsFactory> SessionRunner<P, CP> {
101 #[allow(clippy::too_many_arguments)]
105 pub(crate) fn new(
106 conversation_name: String,
107 conversation: Conversation,
108 mls: Option<CP::Mls>,
109 state_machine: ConversationStateMachine,
110 phase_timer: PhaseTimer,
111 config: ConversationConfig,
112 scoring: CP::Scoring,
113 steward_list: CP::StewardList,
114 consensus: PluginConsensus<P>,
115 transport: SharedDeliveryService,
116 self_identity: Arc<[u8]>,
117 identity_display: Arc<str>,
118 app_id: Arc<[u8]>,
119 ) -> Self {
120 Self {
121 conversation_name,
122 handle: ConversationHandle::new(
123 conversation,
124 mls,
125 state_machine,
126 config,
127 scoring,
128 steward_list,
129 ),
130 consensus,
131 phase_timer,
132 pending_auto_votes: HashMap::new(),
133 pending_consensus_timeouts: HashMap::new(),
134 transport,
135 self_identity,
136 identity_display,
137 app_id,
138 pending_events: Mutex::new(Vec::new()),
139 }
140 }
141
142 pub(crate) fn emit_event(&self, event: SessionEvent) {
148 if let Ok(mut buf) = self.pending_events.lock() {
149 buf.push(event);
150 }
151 }
152
153 pub fn drain_events(&self) -> Vec<SessionEvent> {
157 match self.pending_events.lock() {
158 Ok(mut buf) => std::mem::take(&mut *buf),
159 Err(_) => Vec::new(),
160 }
161 }
162
163 pub(crate) fn transport(&self) -> &SharedDeliveryService {
167 &self.transport
168 }
169
170 pub(crate) fn register_auto_vote(&mut self, proposal_id: u32, delay: Duration, vote: bool) {
176 self.pending_auto_votes.insert(
177 proposal_id,
178 AutoVoteEntry {
179 fire_at: Instant::now() + delay,
180 vote,
181 },
182 );
183 }
184
185 pub(crate) fn cancel_auto_vote(&mut self, proposal_id: u32) {
189 self.pending_auto_votes.remove(&proposal_id);
190 }
191
192 pub(crate) fn cancel_all_auto_votes(&mut self) {
195 self.pending_auto_votes.clear();
196 }
197
198 pub(crate) fn register_consensus_timeout(&mut self, proposal_id: u32, delay: Duration) {
201 self.pending_consensus_timeouts
202 .insert(proposal_id, Instant::now() + delay);
203 }
204
205 pub(crate) fn unregister_consensus_timeout(&mut self, proposal_id: u32) {
210 self.pending_consensus_timeouts.remove(&proposal_id);
211 }
212
213 pub(crate) fn start_working(&mut self) -> ConversationState {
216 self.handle.state_machine.start_working();
217 self.phase_timer.clear();
218 info!(state = "Working", "state transition");
219 ConversationState::Working
220 }
221
222 pub(crate) fn start_freezing(&mut self) -> ConversationState {
223 self.handle.state_machine.start_freezing();
224 self.phase_timer.start();
225 info!(state = "Freezing", "state transition");
226 ConversationState::Freezing
227 }
228
229 pub(crate) fn force_freezing(&mut self) -> Option<ConversationState> {
233 if self.handle.state_machine.force_freezing() {
234 self.phase_timer.start();
235 info!(state = "Freezing", "state transition (forced)");
236 Some(ConversationState::Freezing)
237 } else {
238 None
239 }
240 }
241
242 pub(crate) fn start_selection(&mut self) -> ConversationState {
243 self.handle.state_machine.start_selection();
244 info!(state = "Selection", "state transition");
245 ConversationState::Selection
246 }
247
248 pub(crate) fn start_reelection(&mut self) -> ConversationState {
249 self.handle.state_machine.start_reelection();
250 self.phase_timer.clear();
251 info!(state = "Reelection", "state transition");
252 ConversationState::Reelection
253 }
254
255 pub(crate) fn is_pending_join_expired(&self) -> bool {
258 self.handle.current_state() == ConversationState::PendingJoin
259 && self
260 .phase_timer
261 .elapsed_since_anchor(self.handle.config.commit_inactivity_duration * 3)
262 }
263
264 pub(crate) fn is_freeze_timed_out(&self) -> bool {
266 self.handle.current_state() == ConversationState::Freezing
267 && self
268 .phase_timer
269 .elapsed_since_anchor(self.handle.config.freeze_duration)
270 }
271
272 pub(crate) fn check_steward_inactivity(
278 &mut self,
279 approved_proposals_count: usize,
280 inactivity_duration: Duration,
281 ) -> Option<ConversationState> {
282 if self.handle.current_state() != ConversationState::Working
283 || approved_proposals_count == 0
284 {
285 return None;
286 }
287 if self.phase_timer.started_at().is_none() {
288 self.phase_timer.start();
289 info!(
290 approved = approved_proposals_count,
291 inactivity_ms = inactivity_duration.as_millis() as u64,
292 "inactivity timer started"
293 );
294 return None;
295 }
296 if !self.phase_timer.elapsed_since_anchor(inactivity_duration) {
297 return None;
298 }
299 info!(
300 inactivity_ms = inactivity_duration.as_millis() as u64,
301 approved = approved_proposals_count,
302 "inactivity window elapsed, entering freeze"
303 );
304 Some(self.start_freezing())
305 }
306}
307
308#[cfg(test)]
309mod tests {
310 use std::sync::Mutex;
311 use std::time::Instant;
312
313 use super::*;
314 use crate::core::Conversation;
315 use crate::defaults::DefaultConsensusPlugin;
316 use crate::test_fixtures::{
317 StubPluginsFactory, StubScoring, StubStewardList, UnusedMls, make_test_consensus_service,
318 };
319
320 fn make_runner_pending_join(
321 commit_inactivity: Duration,
322 ) -> SessionRunner<DefaultConsensusPlugin, StubPluginsFactory> {
323 let config = ConversationConfig {
324 commit_inactivity_duration: commit_inactivity,
325 ..ConversationConfig::default()
326 };
327 let mut runner = SessionRunner::new(
328 "g".to_string(),
329 Conversation::new("g"),
330 Some(UnusedMls),
331 ConversationStateMachine::new_as_pending_join(),
332 PhaseTimer::new(),
333 config,
334 StubScoring,
335 StubStewardList::member(),
336 make_test_consensus_service(),
337 Arc::new(Mutex::new(crate::test_fixtures::UnusedTransport)),
338 Arc::from(&b"test-identity"[..]),
339 Arc::from("0xtest-display"),
340 Arc::from(&[0u8; 16][..]),
341 );
342 runner.phase_timer.start();
343 runner
344 }
345
346 fn make_runner_working() -> SessionRunner<DefaultConsensusPlugin, StubPluginsFactory> {
347 SessionRunner::new(
348 "g".to_string(),
349 Conversation::new("g"),
350 Some(UnusedMls),
351 ConversationStateMachine::new_as_member(),
352 PhaseTimer::new(),
353 ConversationConfig::default(),
354 StubScoring,
355 StubStewardList::member(),
356 make_test_consensus_service(),
357 Arc::new(Mutex::new(crate::test_fixtures::UnusedTransport)),
358 Arc::from(&b"test-identity"[..]),
359 Arc::from("0xtest-display"),
360 Arc::from(&[0u8; 16][..]),
361 )
362 }
363
364 #[test]
368 fn pending_join_expires_after_three_times_commit_inactivity() {
369 let inactivity = Duration::from_millis(50);
370 let mut runner = make_runner_pending_join(inactivity);
371
372 assert!(
373 !runner.is_pending_join_expired(),
374 "fresh anchor must not be expired"
375 );
376
377 runner
379 .phase_timer
380 .set_started_at_for_test(Some(Instant::now() - inactivity * 5 / 2));
381 assert!(
382 !runner.is_pending_join_expired(),
383 "before 3× boundary must not be expired"
384 );
385
386 runner
388 .phase_timer
389 .set_started_at_for_test(Some(Instant::now() - inactivity * 4));
390 assert!(
391 runner.is_pending_join_expired(),
392 "past 3× boundary must be expired"
393 );
394 }
395
396 #[test]
399 fn pending_join_expired_only_in_pending_join_state() {
400 let mut runner = make_runner_working();
401 runner
402 .phase_timer
403 .set_started_at_for_test(Some(Instant::now() - Duration::from_secs(3600)));
404 assert!(
405 !runner.is_pending_join_expired(),
406 "Working state must never report pending-join-expired"
407 );
408 }
409
410 #[test]
413 fn check_steward_inactivity_first_tick_anchors_and_returns_none() {
414 let mut runner = make_runner_working();
415 assert_eq!(runner.handle.current_state(), ConversationState::Working);
416 assert!(
417 runner.phase_timer.started_at().is_none(),
418 "fresh runner has no anchor"
419 );
420
421 let result =
422 runner.check_steward_inactivity(1, Duration::from_secs(10));
423
424 assert_eq!(result, None, "first tick auto-anchors and returns None");
425 assert!(
426 runner.phase_timer.started_at().is_some(),
427 "anchor must be set after first tick"
428 );
429 assert_eq!(
430 runner.handle.current_state(),
431 ConversationState::Working,
432 "state must stay Working until inactivity actually elapses"
433 );
434
435 let result =
436 runner.check_steward_inactivity(1, Duration::from_secs(10));
437 assert_eq!(
438 result, None,
439 "second tick before timeout still returns None"
440 );
441 }
442
443 #[test]
445 fn check_steward_inactivity_noop_without_approved_work() {
446 let mut runner = make_runner_working();
447 let result = runner.check_steward_inactivity(0, Duration::from_secs(10));
448 assert_eq!(result, None);
449 assert!(
450 runner.phase_timer.started_at().is_none(),
451 "no approved work must not start the timer"
452 );
453 }
454
455 #[test]
461 fn emit_event_then_drain_returns_insertion_order_and_clears_buffer() {
462 let runner = make_runner_working();
463 runner.emit_event(SessionEvent::Joined);
464 runner.emit_event(SessionEvent::Leaving);
465
466 let drained = runner.drain_events();
467 assert_eq!(drained.len(), 2);
468 assert!(matches!(drained[0], SessionEvent::Joined));
469 assert!(matches!(drained[1], SessionEvent::Leaving));
470
471 assert!(runner.drain_events().is_empty());
473 }
474
475 #[test]
480 fn register_auto_vote_replaces_existing_entry() {
481 let mut runner = make_runner_working();
482 runner.register_auto_vote(7, Duration::from_secs(10), true);
483 let first_fire = runner.pending_auto_votes[&7].fire_at;
484
485 std::thread::sleep(Duration::from_millis(2));
488 runner.register_auto_vote(7, Duration::from_secs(20), false);
489 assert_eq!(runner.pending_auto_votes.len(), 1);
490 let entry = runner.pending_auto_votes[&7];
491 assert!(!entry.vote);
492 assert!(entry.fire_at > first_fire);
493 }
494
495 #[test]
499 fn cancel_auto_vote_removes_only_the_targeted_proposal() {
500 let mut runner = make_runner_working();
501 runner.register_auto_vote(1, Duration::from_secs(5), true);
502 runner.register_auto_vote(2, Duration::from_secs(5), false);
503 runner.register_auto_vote(3, Duration::from_secs(5), true);
504
505 runner.cancel_auto_vote(2);
506 assert!(runner.pending_auto_votes.contains_key(&1));
507 assert!(!runner.pending_auto_votes.contains_key(&2));
508 assert!(runner.pending_auto_votes.contains_key(&3));
509
510 runner.cancel_all_auto_votes();
511 assert!(runner.pending_auto_votes.is_empty());
512 }
513
514 #[test]
519 fn register_then_unregister_consensus_timeout() {
520 let mut runner = make_runner_working();
521 let before = Instant::now();
522 runner.register_consensus_timeout(42, Duration::from_secs(30));
523 let fire_at = runner.pending_consensus_timeouts[&42];
524 assert!(fire_at > before + Duration::from_secs(29));
525 assert!(fire_at < Instant::now() + Duration::from_secs(31));
526
527 runner.unregister_consensus_timeout(42);
528 assert!(!runner.pending_consensus_timeouts.contains_key(&42));
529
530 runner.unregister_consensus_timeout(999);
532 }
533}