Skip to main content

irontide_session/
apply.rs

1//! Transactional `apply_settings` skeleton with phase-ordered rollback (M173 Lane B, B1).
2//!
3//! The naïve `apply_settings` mutates rate limiters BEFORE attempting any
4//! sub-actor reconfig, so a listen-port rebind failure leaves the daemon in
5//! a half-applied state: rate limiter on the new value, listener on the
6//! old. M173 makes `listen_port` / `enable_dht` / `enable_lsd` runtime-
7//! reconfigurable, so the partial-mutation window must close.
8//!
9//! This module provides the transactional skeleton: validate → snapshot →
10//! ordered phases (each with a forward + rollback step) → on any failure,
11//! roll back already-applied phases in REVERSE order.
12//!
13//! ## Phase order (forward)
14//!
15//! 1. **Rate limits + alert mask** (cheap, in-process; rollback = restore)
16//! 2. **Listen port rebind** (TCP listener + uTP rebind + NAT refresh)
17//! 3. **DHT enable/disable** (start or shut down DHT actor; persist routing
18//!    table on stop; broadcast new handle to torrents)
19//! 4. **LSD enable/disable** (start or shut down LSD actor; drop multicast
20//!    socket on stop)
21//!
22//! Phase ordering matters: rate limits roll back in O(microseconds) and
23//! survive any panic, so we apply them first. Listen-port rebind comes
24//! before DHT/LSD because DHT routing-table announcements include the
25//! listen port — flipping DHT before the new port is bound would leak the
26//! old port to the network.
27//!
28//! ## Rollback semantics
29//!
30//! If phase N fails, phases 1..N-1 are rolled back in reverse order using
31//! the per-phase `rollback` callback. If a rollback ITSELF fails, we log
32//! at `error` level (rollback failure is a fatal architecture-level fault,
33//! not something a caller can recover from) and return the original
34//! `ApplyError`. The session is then in a degraded state — see HA spec
35//! "Risks" section. M173 ships the skeleton; phase-rollback failures
36//! upgrading to a "session degraded" state is M174+.
37//!
38//! B1 ships the skeleton with stub forward/rollback callbacks for phases
39//! 2-4 (no-op success). B2-B9 fill in the real sub-actor wiring.
40
41use std::sync::Arc;
42
43use parking_lot::Mutex;
44
45/// Errors that can be returned by the transactional `apply_settings` path.
46///
47/// Covers validation failure, listener bind failure, sub-actor restart
48/// failures, NAT refresh failure, and concurrent reconfig collision.
49/// Each variant is mapped onto an HTTP status by the qBt v2 setPreferences
50/// handler — see [`ApplyError::http_status`].
51#[derive(Debug, thiserror::Error)]
52pub enum ApplyError {
53    /// `Settings::validate()` rejected the new settings.
54    #[error("invalid settings: {0}")]
55    ValidationFailed(String),
56
57    /// The new `listen_port` is already in use (typically `EADDRINUSE`).
58    /// `attempted` is the port we tried to bind; `existing` is the port
59    /// the session is still listening on (rolled back).
60    #[error("listen port {attempted} in use (still listening on {existing})")]
61    ListenPortInUse {
62        /// The port that failed to bind.
63        attempted: u16,
64        /// The port the session is still listening on.
65        existing: u16,
66    },
67
68    /// DHT enable/disable failed.
69    #[error("DHT restart failed: {0}")]
70    DhtRestartFailed(String),
71
72    /// LSD enable/disable failed.
73    #[error("LSD restart failed: {0}")]
74    LsdRestartFailed(String),
75
76    /// NAT mapping refresh failed.
77    ///
78    /// **NOT FATAL** when wrapped — see [`ApplyError::is_fatal`]. Logged
79    /// as a warning; the session continues with the new `listen_port` but
80    /// without router-side mapping.
81    #[error("NAT refresh failed: {0}")]
82    NatRefreshFailed(String),
83
84    /// A second `setPreferences` (or other reconfig) raced this one and
85    /// won the in-flight guard. Caller should retry; the previous call's
86    /// effect is observable via a follow-up `getSettings` query.
87    ///
88    /// Mapped to HTTP 409 Conflict by the qBt v2 setPreferences handler.
89    #[error("concurrent reconfig in flight, retry shortly")]
90    ConcurrentReconfig,
91
92    /// A subsystem operation produced a generic I/O error.
93    #[error("I/O during reconfig: {0}")]
94    Io(String),
95}
96
97impl ApplyError {
98    /// Map this error onto the HTTP status the qBt v2 setPreferences
99    /// handler should return.
100    ///
101    /// - `ValidationFailed` → 400 Bad Request.
102    /// - `ListenPortInUse` → 409 Conflict (port collision is recoverable).
103    /// - `ConcurrentReconfig` → 409 Conflict (another caller raced).
104    /// - `DhtRestartFailed` / `LsdRestartFailed` → 500 Internal.
105    /// - `NatRefreshFailed` → 200 OK (non-fatal — caller proceeds; only
106    ///   the warning header tells them mapping refresh missed).
107    /// - `Io` → 500 Internal.
108    #[must_use]
109    pub fn http_status(&self) -> u16 {
110        match self {
111            Self::ValidationFailed(_) => 400,
112            Self::ListenPortInUse { .. } | Self::ConcurrentReconfig => 409,
113            Self::NatRefreshFailed(_) => 200,
114            Self::DhtRestartFailed(_) | Self::LsdRestartFailed(_) | Self::Io(_) => 500,
115        }
116    }
117
118    /// True if this error indicates the session is in a degraded state
119    /// after rollback (i.e. rollback itself failed).
120    ///
121    /// B1 currently always returns `false`; M174+ will introduce a
122    /// "session degraded" mode for unrecoverable rollback faults.
123    #[must_use]
124    pub const fn is_fatal(&self) -> bool {
125        false
126    }
127}
128
129/// In-flight reconfig guard. Used by [`SessionActor`] to detect concurrent
130/// `setPreferences` calls and reject the second one with
131/// [`ApplyError::ConcurrentReconfig`].
132///
133/// The guard is RAII — `lock` returns a [`ReconfigGuard`] that releases
134/// the slot on drop. If a second caller arrives while the guard is held,
135/// `try_lock` returns `None` and the caller errors out.
136///
137/// We use `Arc<Mutex<bool>>` rather than a `Semaphore(1)` because we need
138/// `try_lock` semantics (fail-fast, not block) and the body of the
139/// critical section runs `&mut self` on the actor — a Tokio Mutex would
140/// require extra await points that complicate the actor's `select!` loop.
141#[derive(Debug, Clone, Default)]
142pub struct ReconfigInFlight {
143    inner: Arc<Mutex<bool>>,
144}
145
146/// RAII guard returned by [`ReconfigInFlight::try_lock`]. Releases the
147/// slot when dropped.
148///
149/// The `'g` lifetime keeps the guard tied to the `ReconfigInFlight` so the
150/// borrow checker prevents the guard from outliving the controller (which
151/// would leak the slot permanently).
152#[must_use = "the guard must be held for the duration of the apply call"]
153pub struct ReconfigGuard<'g> {
154    parent: &'g ReconfigInFlight,
155}
156
157impl ReconfigInFlight {
158    /// Construct a fresh, unlocked guard controller.
159    #[must_use]
160    pub fn new() -> Self {
161        Self::default()
162    }
163
164    /// Try to acquire the in-flight slot.
165    ///
166    /// Returns `Some(ReconfigGuard)` on success — the guard releases the
167    /// slot on drop. Returns `None` if a concurrent call still holds the
168    /// slot (caller must error out with [`ApplyError::ConcurrentReconfig`]).
169    #[must_use]
170    pub fn try_lock(&self) -> Option<ReconfigGuard<'_>> {
171        let mut held = self.inner.lock();
172        if *held {
173            None
174        } else {
175            *held = true;
176            Some(ReconfigGuard { parent: self })
177        }
178    }
179}
180
181impl Drop for ReconfigGuard<'_> {
182    fn drop(&mut self) {
183        let mut held = self.parent.inner.lock();
184        *held = false;
185    }
186}
187
188/// Boxed forward step. Returns `Ok(())` if the phase applied cleanly, or
189/// an [`ApplyError`] that propagates up to the caller after the rollback
190/// pass.
191pub type ForwardStep<S> = Box<dyn FnOnce(&mut S) -> Result<(), ApplyError> + Send>;
192
193/// Boxed rollback step. Invoked only if the matching forward step
194/// previously succeeded; receives the same shared state and undoes the
195/// forward mutation in place. Failures are logged at error level —
196/// rollback failure is a degraded-session signal, not a recoverable
197/// caller-facing error.
198pub type RollbackStep<S> = Box<dyn FnOnce(&mut S) + Send>;
199
200/// A single phase of the transactional apply pipeline.
201///
202/// The forward step is the work done while applying; if it returns `Err`,
203/// the pipeline aborts and rolls back ALREADY-APPLIED phases in reverse.
204/// The rollback closure is invoked only if the forward step previously
205/// succeeded.
206///
207/// This is a pure data type — the executor lives in
208/// [`apply_phases_with_rollback`] and is unit-tested independently of any
209/// real subsystem.
210pub struct Phase<S> {
211    /// Human-readable name for log/error messages.
212    pub name: &'static str,
213    /// Apply step. Borrows the shared state mutably.
214    pub forward: ForwardStep<S>,
215    /// Rollback step. Only invoked if `forward` succeeded.
216    pub rollback: RollbackStep<S>,
217}
218
219/// Run a sequence of phases in order. On the first failure, roll back
220/// all already-applied phases in REVERSE order, then return the failure.
221///
222/// On success, returns `Ok(())` and all phases remain applied. On
223/// failure, the state is restored to the pre-call snapshot (modulo the
224/// guarantees of each phase's `rollback` closure).
225///
226/// # Errors
227///
228/// Returns the first phase's failure verbatim. Any rollback failures are
229/// logged at `error` level (rollback failure is a degraded-session
230/// signal, not something callers can recover from) and the ORIGINAL
231/// failure is still returned.
232pub fn apply_phases_with_rollback<S>(state: &mut S, phases: Vec<Phase<S>>) -> Result<(), ApplyError>
233where
234    S: Send,
235{
236    let mut applied: Vec<(&'static str, RollbackStep<S>)> = Vec::with_capacity(phases.len());
237    for phase in phases {
238        let Phase {
239            name,
240            forward,
241            rollback,
242        } = phase;
243        match forward(state) {
244            Ok(()) => {
245                applied.push((name, rollback));
246            }
247            Err(e) => {
248                // Roll back in reverse order.
249                while let Some((rb_name, rb)) = applied.pop() {
250                    tracing::debug!(phase = rb_name, "rolling back phase");
251                    rb(state);
252                }
253                tracing::warn!(phase = name, error = %e, "transactional apply failed, rolled back");
254                return Err(e);
255            }
256        }
257    }
258    Ok(())
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264
265    /// Mock state for phase-pipeline tests — captures the order of mutations.
266    #[derive(Debug, Default)]
267    struct MockState {
268        rate_limit: u32,
269        listen_port: u16,
270        dht_enabled: bool,
271        /// Reserved for future LSD-toggle phase tests (B9).
272        #[allow(dead_code)]
273        lsd_enabled: bool,
274        events: Vec<&'static str>,
275    }
276
277    fn make_phase<F1, F2>(name: &'static str, forward: F1, rollback: F2) -> Phase<MockState>
278    where
279        F1: FnOnce(&mut MockState) -> Result<(), ApplyError> + Send + 'static,
280        F2: FnOnce(&mut MockState) + Send + 'static,
281    {
282        Phase {
283            name,
284            forward: Box::new(forward),
285            rollback: Box::new(rollback),
286        }
287    }
288
289    #[test]
290    fn empty_phase_list_succeeds_noop() {
291        let mut state = MockState::default();
292        let result = apply_phases_with_rollback(&mut state, Vec::new());
293        assert!(result.is_ok());
294        assert!(state.events.is_empty());
295    }
296
297    #[test]
298    fn all_phases_succeed_in_order() {
299        let mut state = MockState::default();
300        let phases = vec![
301            make_phase(
302                "rate_limits",
303                |s| {
304                    s.rate_limit = 100;
305                    s.events.push("rate:fwd");
306                    Ok(())
307                },
308                |s| s.events.push("rate:rb"),
309            ),
310            make_phase(
311                "listen_port",
312                |s| {
313                    s.listen_port = 6881;
314                    s.events.push("port:fwd");
315                    Ok(())
316                },
317                |s| s.events.push("port:rb"),
318            ),
319            make_phase(
320                "dht",
321                |s| {
322                    s.dht_enabled = true;
323                    s.events.push("dht:fwd");
324                    Ok(())
325                },
326                |s| s.events.push("dht:rb"),
327            ),
328        ];
329        let result = apply_phases_with_rollback(&mut state, phases);
330        assert!(result.is_ok());
331        assert_eq!(state.rate_limit, 100);
332        assert_eq!(state.listen_port, 6881);
333        assert!(state.dht_enabled);
334        // No rollback events should have fired on the success path.
335        assert_eq!(state.events, vec!["rate:fwd", "port:fwd", "dht:fwd"]);
336    }
337
338    #[test]
339    fn third_phase_fails_first_two_rollback_in_reverse() {
340        // The exact scenario the master plan REQUIRES test coverage for:
341        // configure 3 fields, fail the third, assert fields 1-2 are
342        // restored AND rate limiter is NOT on the new values.
343        let mut state = MockState {
344            rate_limit: 1000,   // pre-call value
345            listen_port: 51413, // pre-call value
346            dht_enabled: false, // pre-call value
347            lsd_enabled: false,
348            events: Vec::new(),
349        };
350        let pre_rate = state.rate_limit;
351        let pre_port = state.listen_port;
352
353        let phases = vec![
354            make_phase(
355                "rate_limits",
356                |s| {
357                    s.rate_limit = 100;
358                    s.events.push("rate:fwd");
359                    Ok(())
360                },
361                move |s| {
362                    s.rate_limit = pre_rate; // restore pre-call value
363                    s.events.push("rate:rb");
364                },
365            ),
366            make_phase(
367                "listen_port",
368                |s| {
369                    s.listen_port = 6881;
370                    s.events.push("port:fwd");
371                    Ok(())
372                },
373                move |s| {
374                    s.listen_port = pre_port;
375                    s.events.push("port:rb");
376                },
377            ),
378            make_phase(
379                "dht",
380                |s| {
381                    s.events.push("dht:fwd-fail");
382                    Err(ApplyError::DhtRestartFailed("simulated".into()))
383                },
384                |_| panic!("dht rollback must NOT run if forward failed"),
385            ),
386        ];
387
388        let result = apply_phases_with_rollback(&mut state, phases);
389
390        // Error returned verbatim.
391        assert!(matches!(result, Err(ApplyError::DhtRestartFailed(_))));
392
393        // Fields 1-2 restored to pre-call values.
394        assert_eq!(state.rate_limit, pre_rate);
395        assert_eq!(state.listen_port, pre_port);
396
397        // Reverse-order rollback verified by event log.
398        assert_eq!(
399            state.events,
400            vec![
401                "rate:fwd",
402                "port:fwd",
403                "dht:fwd-fail",
404                "port:rb", // rolled back FIRST (last-applied)
405                "rate:rb", // rolled back SECOND
406            ]
407        );
408    }
409
410    #[test]
411    fn first_phase_fails_no_rollback() {
412        let mut state = MockState::default();
413        let phases = vec![
414            make_phase(
415                "rate_limits",
416                |s| {
417                    s.events.push("rate:fwd-fail");
418                    Err(ApplyError::ValidationFailed("rate too low".into()))
419                },
420                |_| panic!("rollback must NOT run for failed forward"),
421            ),
422            make_phase(
423                "listen_port",
424                |_| panic!("subsequent forward must NOT run after a failure"),
425                |_| panic!("subsequent rollback must NOT run after a failure"),
426            ),
427        ];
428
429        let result = apply_phases_with_rollback(&mut state, phases);
430        assert!(matches!(result, Err(ApplyError::ValidationFailed(_))));
431        assert_eq!(state.events, vec!["rate:fwd-fail"]);
432    }
433
434    #[test]
435    fn reconfig_in_flight_single_lock_releases_on_drop() {
436        let guard = ReconfigInFlight::new();
437        {
438            let _lock = guard.try_lock().expect("first lock should succeed");
439            assert!(
440                guard.try_lock().is_none(),
441                "second concurrent lock must fail"
442            );
443        }
444        // After drop, the slot is free again.
445        assert!(
446            guard.try_lock().is_some(),
447            "lock must be acquirable again after the guard is dropped"
448        );
449    }
450
451    #[test]
452    fn reconfig_in_flight_default_is_unlocked() {
453        let g = ReconfigInFlight::default();
454        assert!(g.try_lock().is_some(), "fresh guard should be unlocked");
455    }
456
457    #[test]
458    fn apply_error_http_status_classification() {
459        assert_eq!(ApplyError::ValidationFailed("x".into()).http_status(), 400);
460        assert_eq!(
461            ApplyError::ListenPortInUse {
462                attempted: 6881,
463                existing: 51413
464            }
465            .http_status(),
466            409
467        );
468        assert_eq!(ApplyError::ConcurrentReconfig.http_status(), 409);
469        assert_eq!(ApplyError::DhtRestartFailed("d".into()).http_status(), 500);
470        assert_eq!(ApplyError::LsdRestartFailed("l".into()).http_status(), 500);
471        assert_eq!(ApplyError::NatRefreshFailed("n".into()).http_status(), 200);
472        assert_eq!(ApplyError::Io("i".into()).http_status(), 500);
473    }
474
475    #[test]
476    fn apply_error_is_fatal_returns_false_in_b1() {
477        // B1 ships always-false; M174+ will introduce session-degraded mode.
478        assert!(!ApplyError::ValidationFailed("x".into()).is_fatal());
479        assert!(!ApplyError::ConcurrentReconfig.is_fatal());
480    }
481}