irontide-session 1.0.1

BitTorrent session management: peers, torrents, and piece selection
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
//! Transactional `apply_settings` skeleton with phase-ordered rollback (M173 Lane B, B1).
//!
//! The naïve `apply_settings` mutates rate limiters BEFORE attempting any
//! sub-actor reconfig, so a listen-port rebind failure leaves the daemon in
//! a half-applied state: rate limiter on the new value, listener on the
//! old. M173 makes `listen_port` / `enable_dht` / `enable_lsd` runtime-
//! reconfigurable, so the partial-mutation window must close.
//!
//! This module provides the transactional skeleton: validate → snapshot →
//! ordered phases (each with a forward + rollback step) → on any failure,
//! roll back already-applied phases in REVERSE order.
//!
//! ## Phase order (forward)
//!
//! 1. **Rate limits + alert mask** (cheap, in-process; rollback = restore)
//! 2. **Listen port rebind** (TCP listener + uTP rebind + NAT refresh)
//! 3. **DHT enable/disable** (start or shut down DHT actor; persist routing
//!    table on stop; broadcast new handle to torrents)
//! 4. **LSD enable/disable** (start or shut down LSD actor; drop multicast
//!    socket on stop)
//!
//! Phase ordering matters: rate limits roll back in O(microseconds) and
//! survive any panic, so we apply them first. Listen-port rebind comes
//! before DHT/LSD because DHT routing-table announcements include the
//! listen port — flipping DHT before the new port is bound would leak the
//! old port to the network.
//!
//! ## Rollback semantics
//!
//! If phase N fails, phases 1..N-1 are rolled back in reverse order using
//! the per-phase `rollback` callback. If a rollback ITSELF fails, we log
//! at `error` level (rollback failure is a fatal architecture-level fault,
//! not something a caller can recover from) and return the original
//! `ApplyError`. The session is then in a degraded state — see HA spec
//! "Risks" section. M173 ships the skeleton; phase-rollback failures
//! upgrading to a "session degraded" state is M174+.
//!
//! B1 ships the skeleton with stub forward/rollback callbacks for phases
//! 2-4 (no-op success). B2-B9 fill in the real sub-actor wiring.

use std::sync::Arc;

use parking_lot::Mutex;

/// Errors that can be returned by the transactional `apply_settings` path.
///
/// Covers validation failure, listener bind failure, sub-actor restart
/// failures, NAT refresh failure, and concurrent reconfig collision.
/// Each variant is mapped onto an HTTP status by the qBt v2 setPreferences
/// handler — see [`ApplyError::http_status`].
#[derive(Debug, thiserror::Error)]
pub enum ApplyError {
    /// `Settings::validate()` rejected the new settings.
    #[error("invalid settings: {0}")]
    ValidationFailed(String),

    /// The new `listen_port` is already in use (typically `EADDRINUSE`).
    /// `attempted` is the port we tried to bind; `existing` is the port
    /// the session is still listening on (rolled back).
    #[error("listen port {attempted} in use (still listening on {existing})")]
    ListenPortInUse {
        /// The port that failed to bind.
        attempted: u16,
        /// The port the session is still listening on.
        existing: u16,
    },

    /// DHT enable/disable failed.
    #[error("DHT restart failed: {0}")]
    DhtRestartFailed(String),

    /// LSD enable/disable failed.
    #[error("LSD restart failed: {0}")]
    LsdRestartFailed(String),

    /// NAT mapping refresh failed.
    ///
    /// **NOT FATAL** when wrapped — see [`ApplyError::is_fatal`]. Logged
    /// as a warning; the session continues with the new `listen_port` but
    /// without router-side mapping.
    #[error("NAT refresh failed: {0}")]
    NatRefreshFailed(String),

    /// A second `setPreferences` (or other reconfig) raced this one and
    /// won the in-flight guard. Caller should retry; the previous call's
    /// effect is observable via a follow-up `getSettings` query.
    ///
    /// Mapped to HTTP 409 Conflict by the qBt v2 setPreferences handler.
    #[error("concurrent reconfig in flight, retry shortly")]
    ConcurrentReconfig,

    /// A subsystem operation produced a generic I/O error.
    #[error("I/O during reconfig: {0}")]
    Io(String),
}

impl ApplyError {
    /// Map this error onto the HTTP status the qBt v2 setPreferences
    /// handler should return.
    ///
    /// - `ValidationFailed` → 400 Bad Request.
    /// - `ListenPortInUse` → 409 Conflict (port collision is recoverable).
    /// - `ConcurrentReconfig` → 409 Conflict (another caller raced).
    /// - `DhtRestartFailed` / `LsdRestartFailed` → 500 Internal.
    /// - `NatRefreshFailed` → 200 OK (non-fatal — caller proceeds; only
    ///   the warning header tells them mapping refresh missed).
    /// - `Io` → 500 Internal.
    #[must_use]
    pub fn http_status(&self) -> u16 {
        match self {
            Self::ValidationFailed(_) => 400,
            Self::ListenPortInUse { .. } | Self::ConcurrentReconfig => 409,
            Self::NatRefreshFailed(_) => 200,
            Self::DhtRestartFailed(_) | Self::LsdRestartFailed(_) | Self::Io(_) => 500,
        }
    }

    /// True if this error indicates the session is in a degraded state
    /// after rollback (i.e. rollback itself failed).
    ///
    /// B1 currently always returns `false`; M174+ will introduce a
    /// "session degraded" mode for unrecoverable rollback faults.
    #[must_use]
    pub const fn is_fatal(&self) -> bool {
        false
    }
}

/// In-flight reconfig guard. Used by [`SessionActor`] to detect concurrent
/// `setPreferences` calls and reject the second one with
/// [`ApplyError::ConcurrentReconfig`].
///
/// The guard is RAII — `lock` returns a [`ReconfigGuard`] that releases
/// the slot on drop. If a second caller arrives while the guard is held,
/// `try_lock` returns `None` and the caller errors out.
///
/// We use `Arc<Mutex<bool>>` rather than a `Semaphore(1)` because we need
/// `try_lock` semantics (fail-fast, not block) and the body of the
/// critical section runs `&mut self` on the actor — a Tokio Mutex would
/// require extra await points that complicate the actor's `select!` loop.
#[derive(Debug, Clone, Default)]
pub struct ReconfigInFlight {
    inner: Arc<Mutex<bool>>,
}

/// RAII guard returned by [`ReconfigInFlight::try_lock`]. Releases the
/// slot when dropped.
///
/// The `'g` lifetime keeps the guard tied to the `ReconfigInFlight` so the
/// borrow checker prevents the guard from outliving the controller (which
/// would leak the slot permanently).
#[must_use = "the guard must be held for the duration of the apply call"]
pub struct ReconfigGuard<'g> {
    parent: &'g ReconfigInFlight,
}

impl ReconfigInFlight {
    /// Construct a fresh, unlocked guard controller.
    #[must_use]
    pub fn new() -> Self {
        Self::default()
    }

    /// Try to acquire the in-flight slot.
    ///
    /// Returns `Some(ReconfigGuard)` on success — the guard releases the
    /// slot on drop. Returns `None` if a concurrent call still holds the
    /// slot (caller must error out with [`ApplyError::ConcurrentReconfig`]).
    #[must_use]
    pub fn try_lock(&self) -> Option<ReconfigGuard<'_>> {
        let mut held = self.inner.lock();
        if *held {
            None
        } else {
            *held = true;
            Some(ReconfigGuard { parent: self })
        }
    }
}

impl Drop for ReconfigGuard<'_> {
    fn drop(&mut self) {
        let mut held = self.parent.inner.lock();
        *held = false;
    }
}

/// Boxed forward step. Returns `Ok(())` if the phase applied cleanly, or
/// an [`ApplyError`] that propagates up to the caller after the rollback
/// pass.
pub type ForwardStep<S> = Box<dyn FnOnce(&mut S) -> Result<(), ApplyError> + Send>;

/// Boxed rollback step. Invoked only if the matching forward step
/// previously succeeded; receives the same shared state and undoes the
/// forward mutation in place. Failures are logged at error level —
/// rollback failure is a degraded-session signal, not a recoverable
/// caller-facing error.
pub type RollbackStep<S> = Box<dyn FnOnce(&mut S) + Send>;

/// A single phase of the transactional apply pipeline.
///
/// The forward step is the work done while applying; if it returns `Err`,
/// the pipeline aborts and rolls back ALREADY-APPLIED phases in reverse.
/// The rollback closure is invoked only if the forward step previously
/// succeeded.
///
/// This is a pure data type — the executor lives in
/// [`apply_phases_with_rollback`] and is unit-tested independently of any
/// real subsystem.
pub struct Phase<S> {
    /// Human-readable name for log/error messages.
    pub name: &'static str,
    /// Apply step. Borrows the shared state mutably.
    pub forward: ForwardStep<S>,
    /// Rollback step. Only invoked if `forward` succeeded.
    pub rollback: RollbackStep<S>,
}

/// Run a sequence of phases in order. On the first failure, roll back
/// all already-applied phases in REVERSE order, then return the failure.
///
/// On success, returns `Ok(())` and all phases remain applied. On
/// failure, the state is restored to the pre-call snapshot (modulo the
/// guarantees of each phase's `rollback` closure).
///
/// # Errors
///
/// Returns the first phase's failure verbatim. Any rollback failures are
/// logged at `error` level (rollback failure is a degraded-session
/// signal, not something callers can recover from) and the ORIGINAL
/// failure is still returned.
pub fn apply_phases_with_rollback<S>(state: &mut S, phases: Vec<Phase<S>>) -> Result<(), ApplyError>
where
    S: Send,
{
    let mut applied: Vec<(&'static str, RollbackStep<S>)> = Vec::with_capacity(phases.len());
    for phase in phases {
        let Phase {
            name,
            forward,
            rollback,
        } = phase;
        match forward(state) {
            Ok(()) => {
                applied.push((name, rollback));
            }
            Err(e) => {
                // Roll back in reverse order.
                while let Some((rb_name, rb)) = applied.pop() {
                    tracing::debug!(phase = rb_name, "rolling back phase");
                    rb(state);
                }
                tracing::warn!(phase = name, error = %e, "transactional apply failed, rolled back");
                return Err(e);
            }
        }
    }
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;

    /// Mock state for phase-pipeline tests — captures the order of mutations.
    #[derive(Debug, Default)]
    struct MockState {
        rate_limit: u32,
        listen_port: u16,
        dht_enabled: bool,
        /// Reserved for future LSD-toggle phase tests (B9).
        #[allow(dead_code)]
        lsd_enabled: bool,
        events: Vec<&'static str>,
    }

    fn make_phase<F1, F2>(name: &'static str, forward: F1, rollback: F2) -> Phase<MockState>
    where
        F1: FnOnce(&mut MockState) -> Result<(), ApplyError> + Send + 'static,
        F2: FnOnce(&mut MockState) + Send + 'static,
    {
        Phase {
            name,
            forward: Box::new(forward),
            rollback: Box::new(rollback),
        }
    }

    #[test]
    fn empty_phase_list_succeeds_noop() {
        let mut state = MockState::default();
        let result = apply_phases_with_rollback(&mut state, Vec::new());
        assert!(result.is_ok());
        assert!(state.events.is_empty());
    }

    #[test]
    fn all_phases_succeed_in_order() {
        let mut state = MockState::default();
        let phases = vec![
            make_phase(
                "rate_limits",
                |s| {
                    s.rate_limit = 100;
                    s.events.push("rate:fwd");
                    Ok(())
                },
                |s| s.events.push("rate:rb"),
            ),
            make_phase(
                "listen_port",
                |s| {
                    s.listen_port = 6881;
                    s.events.push("port:fwd");
                    Ok(())
                },
                |s| s.events.push("port:rb"),
            ),
            make_phase(
                "dht",
                |s| {
                    s.dht_enabled = true;
                    s.events.push("dht:fwd");
                    Ok(())
                },
                |s| s.events.push("dht:rb"),
            ),
        ];
        let result = apply_phases_with_rollback(&mut state, phases);
        assert!(result.is_ok());
        assert_eq!(state.rate_limit, 100);
        assert_eq!(state.listen_port, 6881);
        assert!(state.dht_enabled);
        // No rollback events should have fired on the success path.
        assert_eq!(state.events, vec!["rate:fwd", "port:fwd", "dht:fwd"]);
    }

    #[test]
    fn third_phase_fails_first_two_rollback_in_reverse() {
        // The exact scenario the master plan REQUIRES test coverage for:
        // configure 3 fields, fail the third, assert fields 1-2 are
        // restored AND rate limiter is NOT on the new values.
        let mut state = MockState {
            rate_limit: 1000,   // pre-call value
            listen_port: 51413, // pre-call value
            dht_enabled: false, // pre-call value
            lsd_enabled: false,
            events: Vec::new(),
        };
        let pre_rate = state.rate_limit;
        let pre_port = state.listen_port;

        let phases = vec![
            make_phase(
                "rate_limits",
                |s| {
                    s.rate_limit = 100;
                    s.events.push("rate:fwd");
                    Ok(())
                },
                move |s| {
                    s.rate_limit = pre_rate; // restore pre-call value
                    s.events.push("rate:rb");
                },
            ),
            make_phase(
                "listen_port",
                |s| {
                    s.listen_port = 6881;
                    s.events.push("port:fwd");
                    Ok(())
                },
                move |s| {
                    s.listen_port = pre_port;
                    s.events.push("port:rb");
                },
            ),
            make_phase(
                "dht",
                |s| {
                    s.events.push("dht:fwd-fail");
                    Err(ApplyError::DhtRestartFailed("simulated".into()))
                },
                |_| panic!("dht rollback must NOT run if forward failed"),
            ),
        ];

        let result = apply_phases_with_rollback(&mut state, phases);

        // Error returned verbatim.
        assert!(matches!(result, Err(ApplyError::DhtRestartFailed(_))));

        // Fields 1-2 restored to pre-call values.
        assert_eq!(state.rate_limit, pre_rate);
        assert_eq!(state.listen_port, pre_port);

        // Reverse-order rollback verified by event log.
        assert_eq!(
            state.events,
            vec![
                "rate:fwd",
                "port:fwd",
                "dht:fwd-fail",
                "port:rb", // rolled back FIRST (last-applied)
                "rate:rb", // rolled back SECOND
            ]
        );
    }

    #[test]
    fn first_phase_fails_no_rollback() {
        let mut state = MockState::default();
        let phases = vec![
            make_phase(
                "rate_limits",
                |s| {
                    s.events.push("rate:fwd-fail");
                    Err(ApplyError::ValidationFailed("rate too low".into()))
                },
                |_| panic!("rollback must NOT run for failed forward"),
            ),
            make_phase(
                "listen_port",
                |_| panic!("subsequent forward must NOT run after a failure"),
                |_| panic!("subsequent rollback must NOT run after a failure"),
            ),
        ];

        let result = apply_phases_with_rollback(&mut state, phases);
        assert!(matches!(result, Err(ApplyError::ValidationFailed(_))));
        assert_eq!(state.events, vec!["rate:fwd-fail"]);
    }

    #[test]
    fn reconfig_in_flight_single_lock_releases_on_drop() {
        let guard = ReconfigInFlight::new();
        {
            let _lock = guard.try_lock().expect("first lock should succeed");
            assert!(
                guard.try_lock().is_none(),
                "second concurrent lock must fail"
            );
        }
        // After drop, the slot is free again.
        assert!(
            guard.try_lock().is_some(),
            "lock must be acquirable again after the guard is dropped"
        );
    }

    #[test]
    fn reconfig_in_flight_default_is_unlocked() {
        let g = ReconfigInFlight::default();
        assert!(g.try_lock().is_some(), "fresh guard should be unlocked");
    }

    #[test]
    fn apply_error_http_status_classification() {
        assert_eq!(ApplyError::ValidationFailed("x".into()).http_status(), 400);
        assert_eq!(
            ApplyError::ListenPortInUse {
                attempted: 6881,
                existing: 51413
            }
            .http_status(),
            409
        );
        assert_eq!(ApplyError::ConcurrentReconfig.http_status(), 409);
        assert_eq!(ApplyError::DhtRestartFailed("d".into()).http_status(), 500);
        assert_eq!(ApplyError::LsdRestartFailed("l".into()).http_status(), 500);
        assert_eq!(ApplyError::NatRefreshFailed("n".into()).http_status(), 200);
        assert_eq!(ApplyError::Io("i".into()).http_status(), 500);
    }

    #[test]
    fn apply_error_is_fatal_returns_false_in_b1() {
        // B1 ships always-false; M174+ will introduce session-degraded mode.
        assert!(!ApplyError::ValidationFailed("x".into()).is_fatal());
        assert!(!ApplyError::ConcurrentReconfig.is_fatal());
    }
}