Skip to main content

de_mls/app/session/
runner.rs

1//! [`SessionRunner`] struct, constructor, and the state-machine + phase-timer
2//! coordinators that compose [`crate::core::ConversationHandle`] with
3//! [`crate::app::PhaseTimer`] under one lock. Per-conversation method
4//! bodies (proposal submission, voting, inbound dispatch, etc.) live in
5//! sibling modules and extend `SessionRunner` via additional `impl` blocks.
6
7use std::{
8    collections::HashMap,
9    sync::{Arc, Mutex},
10    time::{Duration, Instant},
11};
12
13use tracing::info;
14
15use crate::{
16    app::{PhaseTimer, UserError},
17    core::{
18        ConsensusPlugin, Conversation, ConversationConfig, ConversationHandle,
19        ConversationPluginsFactory, ConversationState, ConversationStateMachine, PluginConsensus,
20        SessionEvent,
21    },
22    ds::{OutboundPacket, SharedDeliveryService},
23};
24
25/// Free helper that publishes a packet on the supplied transport. Pure sync —
26/// the caller's task does the publish directly. Multi-thread integrators
27/// that want the publish off-runtime can wrap the call site in
28/// `spawn_blocking` themselves.
29pub(crate) fn send_packet(
30    transport: &SharedDeliveryService,
31    packet: OutboundPacket,
32) -> Result<(), UserError> {
33    transport
34        .lock()
35        .map_err(|_| UserError::LockPoisoned("transport"))?
36        .publish(packet)?;
37    Ok(())
38}
39
40/// Default capacity for a session's [`SessionEvent`] broadcast channel.
41/// Sized for bursty proposal sessions (proposals + votes + UI pushes in
42/// One pending auto-vote: cast `vote` for `proposal_id` once the wall-clock
43/// catches up to `fire_at`. Registered by `initiate_proposal` (Deferred
44/// path) and `on_incoming_proposal`; cancelled on manual vote or consensus
45/// resolution; fired by [`SessionRunner::tick_deadlines`].
46#[derive(Debug, Clone, Copy)]
47pub(crate) struct AutoVoteEntry {
48    pub(crate) fire_at: Instant,
49    pub(crate) vote: bool,
50}
51
52pub struct SessionRunner<P: ConsensusPlugin, CP: ConversationPluginsFactory> {
53    /// Conversation name. Identifies this session in the User registry and
54    /// is used to construct scope keys for consensus operations.
55    pub(crate) conversation_name: String,
56    pub(crate) handle: ConversationHandle<CP>,
57    /// Per-conversation consensus service. Owns this conversation's scope
58    /// in the shared storage and a private event bus. Constructed at
59    /// conversation creation by `User::build_consensus_service` and held
60    /// here so consensus calls hit the local service directly without
61    /// User-level lookup. `pub` so integrators can reach
62    /// `session.consensus.event_bus().subscribe()` for per-conv consensus
63    /// event forwarding.
64    pub consensus: PluginConsensus<P>,
65    /// Wall-clock anchor combined with `handle.state_machine` by
66    /// coordinator methods.
67    phase_timer: PhaseTimer,
68    /// Pending auto-votes by `proposal_id`. Walked by
69    /// [`Self::tick_deadlines`]; each entry whose `fire_at` has passed
70    /// gets a `cast_vote` and is removed from the map. Cancelled (removed)
71    /// when a manual vote arrives or the consensus session resolves.
72    pub(crate) pending_auto_votes: HashMap<u32, AutoVoteEntry>,
73    /// Pending consensus-session timeouts: `proposal_id -> fire_at`.
74    /// Registered when a proposal opens (own or incoming peer); fired by
75    /// [`Self::tick_deadlines`] which calls
76    /// `consensus.handle_consensus_timeout`. Removed when the session
77    /// resolves naturally via `apply_consensus_outcome`.
78    pub(crate) pending_consensus_timeouts: HashMap<u32, Instant>,
79    /// Synchronous outbound transport (cloned from `User`). Per-session
80    /// methods reach this via [`Self::transport`] and route through
81    /// [`send_packet`], a direct sync publish.
82    transport: SharedDeliveryService,
83    /// Identity bytes derived from `User.identity.identity_bytes()` at
84    /// session construction. Stored as `Arc<[u8]>` so hot-path session
85    /// code can clone the handle cheaply across lock-guard drops.
86    pub(crate) self_identity: Arc<[u8]>,
87    /// Display form derived from `User.identity.identity_display()` at
88    /// session construction. `Arc<str>` for the same reason as
89    /// `self_identity` — cheap clone across guard boundaries.
90    pub(crate) identity_display: Arc<str>,
91    /// Per-User instance UUID (cloned from `User`). Tagged on every
92    /// outbound packet for self-message filtering.
93    pub(crate) app_id: Arc<[u8]>,
94    /// Pending [`SessionEvent`]s waiting for a caller to drain. Interior
95    /// `Mutex` so producer-side `emit_event` stays `&self`; consumers
96    /// drain via [`Self::drain_events`] once per polling cycle.
97    pending_events: Mutex<Vec<SessionEvent>>,
98}
99
100impl<P: ConsensusPlugin, CP: ConversationPluginsFactory> SessionRunner<P, CP> {
101    /// Build a fresh runner. Creator path passes `Some(mls)`; joiner
102    /// path passes `None` and attaches the MLS service later via
103    /// `handle.attach_mls`.
104    #[allow(clippy::too_many_arguments)]
105    pub(crate) fn new(
106        conversation_name: String,
107        conversation: Conversation,
108        mls: Option<CP::Mls>,
109        state_machine: ConversationStateMachine,
110        phase_timer: PhaseTimer,
111        config: ConversationConfig,
112        scoring: CP::Scoring,
113        steward_list: CP::StewardList,
114        consensus: PluginConsensus<P>,
115        transport: SharedDeliveryService,
116        self_identity: Arc<[u8]>,
117        identity_display: Arc<str>,
118        app_id: Arc<[u8]>,
119    ) -> Self {
120        Self {
121            conversation_name,
122            handle: ConversationHandle::new(
123                conversation,
124                mls,
125                state_machine,
126                config,
127                scoring,
128                steward_list,
129            ),
130            consensus,
131            phase_timer,
132            pending_auto_votes: HashMap::new(),
133            pending_consensus_timeouts: HashMap::new(),
134            transport,
135            self_identity,
136            identity_display,
137            app_id,
138            pending_events: Mutex::new(Vec::new()),
139        }
140    }
141
142    /// Append a [`SessionEvent`] to the pending-events buffer. The caller's
143    /// polling cycle drains it via [`Self::drain_events`]. Stays `&self`
144    /// thanks to the interior [`Mutex`], so the many session-coordinator
145    /// methods that emit during a brief read guard don't need to escalate
146    /// to a write guard. Silent on poison — emit is fire-and-forget.
147    pub(crate) fn emit_event(&self, event: SessionEvent) {
148        if let Ok(mut buf) = self.pending_events.lock() {
149            buf.push(event);
150        }
151    }
152
153    /// Drain every pending [`SessionEvent`] accumulated since the last
154    /// call. Returns events in insertion order. Callers (UI fanout,
155    /// audit log) invoke this once per polling cycle.
156    pub fn drain_events(&self) -> Vec<SessionEvent> {
157        match self.pending_events.lock() {
158            Ok(mut buf) => std::mem::take(&mut *buf),
159            Err(_) => Vec::new(),
160        }
161    }
162
163    /// Borrow the session's transport without taking the runner lock —
164    /// callers that need to send while holding the runner lock briefly can
165    /// clone this and pass it to [`send_packet`] after dropping the guard.
166    pub(crate) fn transport(&self) -> &SharedDeliveryService {
167        &self.transport
168    }
169
170    // ── Pending deadlines (auto-votes + consensus timeouts) ─────────
171
172    /// Register an auto-vote to fire `delay` from now with the given
173    /// `vote` choice. Idempotent — re-registering for the same
174    /// `proposal_id` replaces the existing entry.
175    pub(crate) fn register_auto_vote(&mut self, proposal_id: u32, delay: Duration, vote: bool) {
176        self.pending_auto_votes.insert(
177            proposal_id,
178            AutoVoteEntry {
179                fire_at: Instant::now() + delay,
180                vote,
181            },
182        );
183    }
184
185    /// Drop the pending auto-vote for `proposal_id` if any is registered.
186    /// Called when a manual vote arrives (manual choice wins) or when the
187    /// consensus session resolves (vote no longer meaningful).
188    pub(crate) fn cancel_auto_vote(&mut self, proposal_id: u32) {
189        self.pending_auto_votes.remove(&proposal_id);
190    }
191
192    /// Drop every pending auto-vote on this runner. Called on conversation
193    /// leave so no stale entries fire against a conversation we've left.
194    pub(crate) fn cancel_all_auto_votes(&mut self) {
195        self.pending_auto_votes.clear();
196    }
197
198    /// Register a consensus-session timeout. Fires `delay` from now via
199    /// [`Self::tick_deadlines`]; removed naturally on consensus resolution.
200    pub(crate) fn register_consensus_timeout(&mut self, proposal_id: u32, delay: Duration) {
201        self.pending_consensus_timeouts
202            .insert(proposal_id, Instant::now() + delay);
203    }
204
205    /// Drop the pending consensus timeout for `proposal_id`. Called from
206    /// `apply_consensus_outcome` once the library reaches/fails consensus,
207    /// so the timeout can't fire a stale `handle_consensus_timeout` against
208    /// an already-resolved session.
209    pub(crate) fn unregister_consensus_timeout(&mut self, proposal_id: u32) {
210        self.pending_consensus_timeouts.remove(&proposal_id);
211    }
212
213    // ── State-machine + phase-timer coordinators ────────────────────
214
215    pub(crate) fn start_working(&mut self) -> ConversationState {
216        self.handle.state_machine.start_working();
217        self.phase_timer.clear();
218        info!(state = "Working", "state transition");
219        ConversationState::Working
220    }
221
222    pub(crate) fn start_freezing(&mut self) -> ConversationState {
223        self.handle.state_machine.start_freezing();
224        self.phase_timer.start();
225        info!(state = "Freezing", "state transition");
226        ConversationState::Freezing
227    }
228
229    /// Bypass the inactivity timer and enter Freezing immediately. Returns
230    /// `Some(Freezing)` on transition (only fires from Working or
231    /// Reelection); `None` from other states.
232    pub(crate) fn force_freezing(&mut self) -> Option<ConversationState> {
233        if self.handle.state_machine.force_freezing() {
234            self.phase_timer.start();
235            info!(state = "Freezing", "state transition (forced)");
236            Some(ConversationState::Freezing)
237        } else {
238            None
239        }
240    }
241
242    pub(crate) fn start_selection(&mut self) -> ConversationState {
243        self.handle.state_machine.start_selection();
244        info!(state = "Selection", "state transition");
245        ConversationState::Selection
246    }
247
248    pub(crate) fn start_reelection(&mut self) -> ConversationState {
249        self.handle.state_machine.start_reelection();
250        self.phase_timer.clear();
251        info!(state = "Reelection", "state transition");
252        ConversationState::Reelection
253    }
254
255    /// `true` once 3× `commit_inactivity_duration` has passed in
256    /// `PendingJoin` without a welcome.
257    pub(crate) fn is_pending_join_expired(&self) -> bool {
258        self.handle.current_state() == ConversationState::PendingJoin
259            && self
260                .phase_timer
261                .elapsed_since_anchor(self.handle.config.commit_inactivity_duration * 3)
262    }
263
264    /// `true` once the freeze window elapsed while in `Freezing`.
265    pub(crate) fn is_freeze_timed_out(&self) -> bool {
266        self.handle.current_state() == ConversationState::Freezing
267            && self
268                .phase_timer
269                .elapsed_since_anchor(self.handle.config.freeze_duration)
270    }
271
272    /// Drives the "steward waited too long to commit" transition into
273    /// `Freezing`. Call each poll tick. Returns `Some(Freezing)` exactly
274    /// on the tick that transitions; `None` while still waiting, outside
275    /// Working, or when there's no approved work. Self-starts the
276    /// inactivity anchor on the first tick with approved work.
277    pub(crate) fn check_steward_inactivity(
278        &mut self,
279        approved_proposals_count: usize,
280        inactivity_duration: Duration,
281    ) -> Option<ConversationState> {
282        if self.handle.current_state() != ConversationState::Working
283            || approved_proposals_count == 0
284        {
285            return None;
286        }
287        if self.phase_timer.started_at().is_none() {
288            self.phase_timer.start();
289            info!(
290                approved = approved_proposals_count,
291                inactivity_ms = inactivity_duration.as_millis() as u64,
292                "inactivity timer started"
293            );
294            return None;
295        }
296        if !self.phase_timer.elapsed_since_anchor(inactivity_duration) {
297            return None;
298        }
299        info!(
300            inactivity_ms = inactivity_duration.as_millis() as u64,
301            approved = approved_proposals_count,
302            "inactivity window elapsed, entering freeze"
303        );
304        Some(self.start_freezing())
305    }
306}
307
308#[cfg(test)]
309mod tests {
310    use std::sync::Mutex;
311    use std::time::Instant;
312
313    use super::*;
314    use crate::core::Conversation;
315    use crate::defaults::DefaultConsensusPlugin;
316    use crate::test_fixtures::{
317        StubPluginsFactory, StubScoring, StubStewardList, UnusedMls, make_test_consensus_service,
318    };
319
320    fn make_runner_pending_join(
321        commit_inactivity: Duration,
322    ) -> SessionRunner<DefaultConsensusPlugin, StubPluginsFactory> {
323        let config = ConversationConfig {
324            commit_inactivity_duration: commit_inactivity,
325            ..ConversationConfig::default()
326        };
327        let mut runner = SessionRunner::new(
328            "g".to_string(),
329            Conversation::new("g"),
330            Some(UnusedMls),
331            ConversationStateMachine::new_as_pending_join(),
332            PhaseTimer::new(),
333            config,
334            StubScoring,
335            StubStewardList::member(),
336            make_test_consensus_service(),
337            Arc::new(Mutex::new(crate::test_fixtures::UnusedTransport)),
338            Arc::from(&b"test-identity"[..]),
339            Arc::from("0xtest-display"),
340            Arc::from(&[0u8; 16][..]),
341        );
342        runner.phase_timer.start();
343        runner
344    }
345
346    fn make_runner_working() -> SessionRunner<DefaultConsensusPlugin, StubPluginsFactory> {
347        SessionRunner::new(
348            "g".to_string(),
349            Conversation::new("g"),
350            Some(UnusedMls),
351            ConversationStateMachine::new_as_member(),
352            PhaseTimer::new(),
353            ConversationConfig::default(),
354            StubScoring,
355            StubStewardList::member(),
356            make_test_consensus_service(),
357            Arc::new(Mutex::new(crate::test_fixtures::UnusedTransport)),
358            Arc::from(&b"test-identity"[..]),
359            Arc::from("0xtest-display"),
360            Arc::from(&[0u8; 16][..]),
361        )
362    }
363
364    /// `is_pending_join_expired` flips once 3× `commit_inactivity_duration`
365    /// has passed since the anchor. Test backdates the anchor to avoid
366    /// a real wall-clock wait.
367    #[test]
368    fn pending_join_expires_after_three_times_commit_inactivity() {
369        let inactivity = Duration::from_millis(50);
370        let mut runner = make_runner_pending_join(inactivity);
371
372        assert!(
373            !runner.is_pending_join_expired(),
374            "fresh anchor must not be expired"
375        );
376
377        // Just inside the window: anchor 2.5× inactivity in the past.
378        runner
379            .phase_timer
380            .set_started_at_for_test(Some(Instant::now() - inactivity * 5 / 2));
381        assert!(
382            !runner.is_pending_join_expired(),
383            "before 3× boundary must not be expired"
384        );
385
386        // Past the boundary: anchor 4× inactivity in the past.
387        runner
388            .phase_timer
389            .set_started_at_for_test(Some(Instant::now() - inactivity * 4));
390        assert!(
391            runner.is_pending_join_expired(),
392            "past 3× boundary must be expired"
393        );
394    }
395
396    /// Outside `PendingJoin`, `is_pending_join_expired` always returns false
397    /// regardless of how old the anchor is.
398    #[test]
399    fn pending_join_expired_only_in_pending_join_state() {
400        let mut runner = make_runner_working();
401        runner
402            .phase_timer
403            .set_started_at_for_test(Some(Instant::now() - Duration::from_secs(3600)));
404        assert!(
405            !runner.is_pending_join_expired(),
406            "Working state must never report pending-join-expired"
407        );
408    }
409
410    /// First tick with approved work auto-anchors the timer and returns `None`.
411    /// Second tick before timeout still returns `None`. State must remain Working.
412    #[test]
413    fn check_steward_inactivity_first_tick_anchors_and_returns_none() {
414        let mut runner = make_runner_working();
415        assert_eq!(runner.handle.current_state(), ConversationState::Working);
416        assert!(
417            runner.phase_timer.started_at().is_none(),
418            "fresh runner has no anchor"
419        );
420
421        let result =
422            runner.check_steward_inactivity(/* approved */ 1, Duration::from_secs(10));
423
424        assert_eq!(result, None, "first tick auto-anchors and returns None");
425        assert!(
426            runner.phase_timer.started_at().is_some(),
427            "anchor must be set after first tick"
428        );
429        assert_eq!(
430            runner.handle.current_state(),
431            ConversationState::Working,
432            "state must stay Working until inactivity actually elapses"
433        );
434
435        let result =
436            runner.check_steward_inactivity(/* approved */ 1, Duration::from_secs(10));
437        assert_eq!(
438            result, None,
439            "second tick before timeout still returns None"
440        );
441    }
442
443    /// No approved work → no anchor started, no transition.
444    #[test]
445    fn check_steward_inactivity_noop_without_approved_work() {
446        let mut runner = make_runner_working();
447        let result = runner.check_steward_inactivity(0, Duration::from_secs(10));
448        assert_eq!(result, None);
449        assert!(
450            runner.phase_timer.started_at().is_none(),
451            "no approved work must not start the timer"
452        );
453    }
454
455    // ── Caller-polled deadlines + drain model ───────────────────────────
456
457    /// `emit_event` appends and `drain_events` returns insertion-ordered.
458    /// Establishes the contract relied on by integration tests that build
459    /// up an event log over multiple polling cycles.
460    #[test]
461    fn emit_event_then_drain_returns_insertion_order_and_clears_buffer() {
462        let runner = make_runner_working();
463        runner.emit_event(SessionEvent::Joined);
464        runner.emit_event(SessionEvent::Leaving);
465
466        let drained = runner.drain_events();
467        assert_eq!(drained.len(), 2);
468        assert!(matches!(drained[0], SessionEvent::Joined));
469        assert!(matches!(drained[1], SessionEvent::Leaving));
470
471        // Second drain returns empty — buffer was cleared.
472        assert!(runner.drain_events().is_empty());
473    }
474
475    /// `register_auto_vote` is idempotent — re-registering the same
476    /// `proposal_id` replaces the previous entry rather than stacking.
477    /// Caller relies on this when re-anchoring an auto-vote on a `Deferred`
478    /// re-submit.
479    #[test]
480    fn register_auto_vote_replaces_existing_entry() {
481        let mut runner = make_runner_working();
482        runner.register_auto_vote(7, Duration::from_secs(10), true);
483        let first_fire = runner.pending_auto_votes[&7].fire_at;
484
485        // Re-register with a different `vote` and a longer delay; the
486        // second insert must overwrite, not co-exist.
487        std::thread::sleep(Duration::from_millis(2));
488        runner.register_auto_vote(7, Duration::from_secs(20), false);
489        assert_eq!(runner.pending_auto_votes.len(), 1);
490        let entry = runner.pending_auto_votes[&7];
491        assert!(!entry.vote);
492        assert!(entry.fire_at > first_fire);
493    }
494
495    /// `cancel_auto_vote` drops one entry. `cancel_all_auto_votes` drops
496    /// every entry. Both are the only paths that should remove pending
497    /// auto-votes from outside `tick_deadlines`.
498    #[test]
499    fn cancel_auto_vote_removes_only_the_targeted_proposal() {
500        let mut runner = make_runner_working();
501        runner.register_auto_vote(1, Duration::from_secs(5), true);
502        runner.register_auto_vote(2, Duration::from_secs(5), false);
503        runner.register_auto_vote(3, Duration::from_secs(5), true);
504
505        runner.cancel_auto_vote(2);
506        assert!(runner.pending_auto_votes.contains_key(&1));
507        assert!(!runner.pending_auto_votes.contains_key(&2));
508        assert!(runner.pending_auto_votes.contains_key(&3));
509
510        runner.cancel_all_auto_votes();
511        assert!(runner.pending_auto_votes.is_empty());
512    }
513
514    /// `register_consensus_timeout` records `now + delay`;
515    /// `unregister_consensus_timeout` drops it. `apply_consensus_outcome`
516    /// uses the unregister path to drop deadlines on natural resolution
517    /// so `tick_deadlines` doesn't fire a stale `handle_consensus_timeout`.
518    #[test]
519    fn register_then_unregister_consensus_timeout() {
520        let mut runner = make_runner_working();
521        let before = Instant::now();
522        runner.register_consensus_timeout(42, Duration::from_secs(30));
523        let fire_at = runner.pending_consensus_timeouts[&42];
524        assert!(fire_at > before + Duration::from_secs(29));
525        assert!(fire_at < Instant::now() + Duration::from_secs(31));
526
527        runner.unregister_consensus_timeout(42);
528        assert!(!runner.pending_consensus_timeouts.contains_key(&42));
529
530        // Unregistering an unknown id is a no-op (no panic, no error).
531        runner.unregister_consensus_timeout(999);
532    }
533}