irontide_session/apply.rs
1//! Transactional `apply_settings` skeleton with phase-ordered rollback (M173 Lane B, B1).
2//!
3//! The naïve `apply_settings` mutates rate limiters BEFORE attempting any
4//! sub-actor reconfig, so a listen-port rebind failure leaves the daemon in
5//! a half-applied state: rate limiter on the new value, listener on the
6//! old. M173 makes `listen_port` / `enable_dht` / `enable_lsd` runtime-
7//! reconfigurable, so the partial-mutation window must close.
8//!
9//! This module provides the transactional skeleton: validate → snapshot →
10//! ordered phases (each with a forward + rollback step) → on any failure,
11//! roll back already-applied phases in REVERSE order.
12//!
13//! ## Phase order (forward)
14//!
15//! 1. **Rate limits + alert mask** (cheap, in-process; rollback = restore)
16//! 2. **Listen port rebind** (TCP listener + uTP rebind + NAT refresh)
17//! 3. **DHT enable/disable** (start or shut down DHT actor; persist routing
18//! table on stop; broadcast new handle to torrents)
19//! 4. **LSD enable/disable** (start or shut down LSD actor; drop multicast
20//! socket on stop)
21//!
22//! Phase ordering matters: rate limits roll back in O(microseconds) and
23//! survive any panic, so we apply them first. Listen-port rebind comes
24//! before DHT/LSD because DHT routing-table announcements include the
25//! listen port — flipping DHT before the new port is bound would leak the
26//! old port to the network.
27//!
28//! ## Rollback semantics
29//!
30//! If phase N fails, phases 1..N-1 are rolled back in reverse order using
31//! the per-phase `rollback` callback. If a rollback ITSELF fails, we log
32//! at `error` level (rollback failure is a fatal architecture-level fault,
33//! not something a caller can recover from) and return the original
34//! `ApplyError`. The session is then in a degraded state — see HA spec
35//! "Risks" section. M173 ships the skeleton; phase-rollback failures
36//! upgrading to a "session degraded" state is M174+.
37//!
38//! B1 ships the skeleton with stub forward/rollback callbacks for phases
39//! 2-4 (no-op success). B2-B9 fill in the real sub-actor wiring.
40
41use std::sync::Arc;
42
43use parking_lot::Mutex;
44
45/// Errors that can be returned by the transactional `apply_settings` path.
46///
47/// Covers validation failure, listener bind failure, sub-actor restart
48/// failures, NAT refresh failure, and concurrent reconfig collision.
49/// Each variant is mapped onto an HTTP status by the qBt v2 setPreferences
50/// handler — see [`ApplyError::http_status`].
51#[derive(Debug, thiserror::Error)]
52pub enum ApplyError {
53 /// `Settings::validate()` rejected the new settings.
54 #[error("invalid settings: {0}")]
55 ValidationFailed(String),
56
57 /// The new `listen_port` is already in use (typically `EADDRINUSE`).
58 /// `attempted` is the port we tried to bind; `existing` is the port
59 /// the session is still listening on (rolled back).
60 #[error("listen port {attempted} in use (still listening on {existing})")]
61 ListenPortInUse {
62 /// The port that failed to bind.
63 attempted: u16,
64 /// The port the session is still listening on.
65 existing: u16,
66 },
67
68 /// DHT enable/disable failed.
69 #[error("DHT restart failed: {0}")]
70 DhtRestartFailed(String),
71
72 /// LSD enable/disable failed.
73 #[error("LSD restart failed: {0}")]
74 LsdRestartFailed(String),
75
76 /// NAT mapping refresh failed.
77 ///
78 /// **NOT FATAL** when wrapped — see [`ApplyError::is_fatal`]. Logged
79 /// as a warning; the session continues with the new `listen_port` but
80 /// without router-side mapping.
81 #[error("NAT refresh failed: {0}")]
82 NatRefreshFailed(String),
83
84 /// A second `setPreferences` (or other reconfig) raced this one and
85 /// won the in-flight guard. Caller should retry; the previous call's
86 /// effect is observable via a follow-up `getSettings` query.
87 ///
88 /// Mapped to HTTP 409 Conflict by the qBt v2 setPreferences handler.
89 #[error("concurrent reconfig in flight, retry shortly")]
90 ConcurrentReconfig,
91
92 /// A subsystem operation produced a generic I/O error.
93 #[error("I/O during reconfig: {0}")]
94 Io(String),
95}
96
97impl ApplyError {
98 /// Map this error onto the HTTP status the qBt v2 setPreferences
99 /// handler should return.
100 ///
101 /// - `ValidationFailed` → 400 Bad Request.
102 /// - `ListenPortInUse` → 409 Conflict (port collision is recoverable).
103 /// - `ConcurrentReconfig` → 409 Conflict (another caller raced).
104 /// - `DhtRestartFailed` / `LsdRestartFailed` → 500 Internal.
105 /// - `NatRefreshFailed` → 200 OK (non-fatal — caller proceeds; only
106 /// the warning header tells them mapping refresh missed).
107 /// - `Io` → 500 Internal.
108 #[must_use]
109 pub fn http_status(&self) -> u16 {
110 match self {
111 Self::ValidationFailed(_) => 400,
112 Self::ListenPortInUse { .. } | Self::ConcurrentReconfig => 409,
113 Self::NatRefreshFailed(_) => 200,
114 Self::DhtRestartFailed(_) | Self::LsdRestartFailed(_) | Self::Io(_) => 500,
115 }
116 }
117
118 /// True if this error indicates the session is in a degraded state
119 /// after rollback (i.e. rollback itself failed).
120 ///
121 /// B1 currently always returns `false`; M174+ will introduce a
122 /// "session degraded" mode for unrecoverable rollback faults.
123 #[must_use]
124 pub const fn is_fatal(&self) -> bool {
125 false
126 }
127}
128
129/// In-flight reconfig guard. Used by [`SessionActor`] to detect concurrent
130/// `setPreferences` calls and reject the second one with
131/// [`ApplyError::ConcurrentReconfig`].
132///
133/// The guard is RAII — `lock` returns a [`ReconfigGuard`] that releases
134/// the slot on drop. If a second caller arrives while the guard is held,
135/// `try_lock` returns `None` and the caller errors out.
136///
137/// We use `Arc<Mutex<bool>>` rather than a `Semaphore(1)` because we need
138/// `try_lock` semantics (fail-fast, not block) and the body of the
139/// critical section runs `&mut self` on the actor — a Tokio Mutex would
140/// require extra await points that complicate the actor's `select!` loop.
141#[derive(Debug, Clone, Default)]
142pub struct ReconfigInFlight {
143 inner: Arc<Mutex<bool>>,
144}
145
146/// RAII guard returned by [`ReconfigInFlight::try_lock`]. Releases the
147/// slot when dropped.
148///
149/// The `'g` lifetime keeps the guard tied to the `ReconfigInFlight` so the
150/// borrow checker prevents the guard from outliving the controller (which
151/// would leak the slot permanently).
152#[must_use = "the guard must be held for the duration of the apply call"]
153pub struct ReconfigGuard<'g> {
154 parent: &'g ReconfigInFlight,
155}
156
157impl ReconfigInFlight {
158 /// Construct a fresh, unlocked guard controller.
159 #[must_use]
160 pub fn new() -> Self {
161 Self::default()
162 }
163
164 /// Try to acquire the in-flight slot.
165 ///
166 /// Returns `Some(ReconfigGuard)` on success — the guard releases the
167 /// slot on drop. Returns `None` if a concurrent call still holds the
168 /// slot (caller must error out with [`ApplyError::ConcurrentReconfig`]).
169 #[must_use]
170 pub fn try_lock(&self) -> Option<ReconfigGuard<'_>> {
171 let mut held = self.inner.lock();
172 if *held {
173 None
174 } else {
175 *held = true;
176 Some(ReconfigGuard { parent: self })
177 }
178 }
179}
180
181impl Drop for ReconfigGuard<'_> {
182 fn drop(&mut self) {
183 let mut held = self.parent.inner.lock();
184 *held = false;
185 }
186}
187
188/// Boxed forward step. Returns `Ok(())` if the phase applied cleanly, or
189/// an [`ApplyError`] that propagates up to the caller after the rollback
190/// pass.
191pub type ForwardStep<S> = Box<dyn FnOnce(&mut S) -> Result<(), ApplyError> + Send>;
192
193/// Boxed rollback step. Invoked only if the matching forward step
194/// previously succeeded; receives the same shared state and undoes the
195/// forward mutation in place. Failures are logged at error level —
196/// rollback failure is a degraded-session signal, not a recoverable
197/// caller-facing error.
198pub type RollbackStep<S> = Box<dyn FnOnce(&mut S) + Send>;
199
200/// A single phase of the transactional apply pipeline.
201///
202/// The forward step is the work done while applying; if it returns `Err`,
203/// the pipeline aborts and rolls back ALREADY-APPLIED phases in reverse.
204/// The rollback closure is invoked only if the forward step previously
205/// succeeded.
206///
207/// This is a pure data type — the executor lives in
208/// [`apply_phases_with_rollback`] and is unit-tested independently of any
209/// real subsystem.
210pub struct Phase<S> {
211 /// Human-readable name for log/error messages.
212 pub name: &'static str,
213 /// Apply step. Borrows the shared state mutably.
214 pub forward: ForwardStep<S>,
215 /// Rollback step. Only invoked if `forward` succeeded.
216 pub rollback: RollbackStep<S>,
217}
218
219/// Run a sequence of phases in order. On the first failure, roll back
220/// all already-applied phases in REVERSE order, then return the failure.
221///
222/// On success, returns `Ok(())` and all phases remain applied. On
223/// failure, the state is restored to the pre-call snapshot (modulo the
224/// guarantees of each phase's `rollback` closure).
225///
226/// # Errors
227///
228/// Returns the first phase's failure verbatim. Any rollback failures are
229/// logged at `error` level (rollback failure is a degraded-session
230/// signal, not something callers can recover from) and the ORIGINAL
231/// failure is still returned.
232pub fn apply_phases_with_rollback<S>(state: &mut S, phases: Vec<Phase<S>>) -> Result<(), ApplyError>
233where
234 S: Send,
235{
236 let mut applied: Vec<(&'static str, RollbackStep<S>)> = Vec::with_capacity(phases.len());
237 for phase in phases {
238 let Phase {
239 name,
240 forward,
241 rollback,
242 } = phase;
243 match forward(state) {
244 Ok(()) => {
245 applied.push((name, rollback));
246 }
247 Err(e) => {
248 // Roll back in reverse order.
249 while let Some((rb_name, rb)) = applied.pop() {
250 tracing::debug!(phase = rb_name, "rolling back phase");
251 rb(state);
252 }
253 tracing::warn!(phase = name, error = %e, "transactional apply failed, rolled back");
254 return Err(e);
255 }
256 }
257 }
258 Ok(())
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264
265 /// Mock state for phase-pipeline tests — captures the order of mutations.
266 #[derive(Debug, Default)]
267 struct MockState {
268 rate_limit: u32,
269 listen_port: u16,
270 dht_enabled: bool,
271 /// Reserved for future LSD-toggle phase tests (B9).
272 #[allow(dead_code)]
273 lsd_enabled: bool,
274 events: Vec<&'static str>,
275 }
276
277 fn make_phase<F1, F2>(name: &'static str, forward: F1, rollback: F2) -> Phase<MockState>
278 where
279 F1: FnOnce(&mut MockState) -> Result<(), ApplyError> + Send + 'static,
280 F2: FnOnce(&mut MockState) + Send + 'static,
281 {
282 Phase {
283 name,
284 forward: Box::new(forward),
285 rollback: Box::new(rollback),
286 }
287 }
288
289 #[test]
290 fn empty_phase_list_succeeds_noop() {
291 let mut state = MockState::default();
292 let result = apply_phases_with_rollback(&mut state, Vec::new());
293 assert!(result.is_ok());
294 assert!(state.events.is_empty());
295 }
296
297 #[test]
298 fn all_phases_succeed_in_order() {
299 let mut state = MockState::default();
300 let phases = vec![
301 make_phase(
302 "rate_limits",
303 |s| {
304 s.rate_limit = 100;
305 s.events.push("rate:fwd");
306 Ok(())
307 },
308 |s| s.events.push("rate:rb"),
309 ),
310 make_phase(
311 "listen_port",
312 |s| {
313 s.listen_port = 6881;
314 s.events.push("port:fwd");
315 Ok(())
316 },
317 |s| s.events.push("port:rb"),
318 ),
319 make_phase(
320 "dht",
321 |s| {
322 s.dht_enabled = true;
323 s.events.push("dht:fwd");
324 Ok(())
325 },
326 |s| s.events.push("dht:rb"),
327 ),
328 ];
329 let result = apply_phases_with_rollback(&mut state, phases);
330 assert!(result.is_ok());
331 assert_eq!(state.rate_limit, 100);
332 assert_eq!(state.listen_port, 6881);
333 assert!(state.dht_enabled);
334 // No rollback events should have fired on the success path.
335 assert_eq!(state.events, vec!["rate:fwd", "port:fwd", "dht:fwd"]);
336 }
337
338 #[test]
339 fn third_phase_fails_first_two_rollback_in_reverse() {
340 // The exact scenario the master plan REQUIRES test coverage for:
341 // configure 3 fields, fail the third, assert fields 1-2 are
342 // restored AND rate limiter is NOT on the new values.
343 let mut state = MockState {
344 rate_limit: 1000, // pre-call value
345 listen_port: 51413, // pre-call value
346 dht_enabled: false, // pre-call value
347 lsd_enabled: false,
348 events: Vec::new(),
349 };
350 let pre_rate = state.rate_limit;
351 let pre_port = state.listen_port;
352
353 let phases = vec![
354 make_phase(
355 "rate_limits",
356 |s| {
357 s.rate_limit = 100;
358 s.events.push("rate:fwd");
359 Ok(())
360 },
361 move |s| {
362 s.rate_limit = pre_rate; // restore pre-call value
363 s.events.push("rate:rb");
364 },
365 ),
366 make_phase(
367 "listen_port",
368 |s| {
369 s.listen_port = 6881;
370 s.events.push("port:fwd");
371 Ok(())
372 },
373 move |s| {
374 s.listen_port = pre_port;
375 s.events.push("port:rb");
376 },
377 ),
378 make_phase(
379 "dht",
380 |s| {
381 s.events.push("dht:fwd-fail");
382 Err(ApplyError::DhtRestartFailed("simulated".into()))
383 },
384 |_| panic!("dht rollback must NOT run if forward failed"),
385 ),
386 ];
387
388 let result = apply_phases_with_rollback(&mut state, phases);
389
390 // Error returned verbatim.
391 assert!(matches!(result, Err(ApplyError::DhtRestartFailed(_))));
392
393 // Fields 1-2 restored to pre-call values.
394 assert_eq!(state.rate_limit, pre_rate);
395 assert_eq!(state.listen_port, pre_port);
396
397 // Reverse-order rollback verified by event log.
398 assert_eq!(
399 state.events,
400 vec![
401 "rate:fwd",
402 "port:fwd",
403 "dht:fwd-fail",
404 "port:rb", // rolled back FIRST (last-applied)
405 "rate:rb", // rolled back SECOND
406 ]
407 );
408 }
409
410 #[test]
411 fn first_phase_fails_no_rollback() {
412 let mut state = MockState::default();
413 let phases = vec![
414 make_phase(
415 "rate_limits",
416 |s| {
417 s.events.push("rate:fwd-fail");
418 Err(ApplyError::ValidationFailed("rate too low".into()))
419 },
420 |_| panic!("rollback must NOT run for failed forward"),
421 ),
422 make_phase(
423 "listen_port",
424 |_| panic!("subsequent forward must NOT run after a failure"),
425 |_| panic!("subsequent rollback must NOT run after a failure"),
426 ),
427 ];
428
429 let result = apply_phases_with_rollback(&mut state, phases);
430 assert!(matches!(result, Err(ApplyError::ValidationFailed(_))));
431 assert_eq!(state.events, vec!["rate:fwd-fail"]);
432 }
433
434 #[test]
435 fn reconfig_in_flight_single_lock_releases_on_drop() {
436 let guard = ReconfigInFlight::new();
437 {
438 let _lock = guard.try_lock().expect("first lock should succeed");
439 assert!(
440 guard.try_lock().is_none(),
441 "second concurrent lock must fail"
442 );
443 }
444 // After drop, the slot is free again.
445 assert!(
446 guard.try_lock().is_some(),
447 "lock must be acquirable again after the guard is dropped"
448 );
449 }
450
451 #[test]
452 fn reconfig_in_flight_default_is_unlocked() {
453 let g = ReconfigInFlight::default();
454 assert!(g.try_lock().is_some(), "fresh guard should be unlocked");
455 }
456
457 #[test]
458 fn apply_error_http_status_classification() {
459 assert_eq!(ApplyError::ValidationFailed("x".into()).http_status(), 400);
460 assert_eq!(
461 ApplyError::ListenPortInUse {
462 attempted: 6881,
463 existing: 51413
464 }
465 .http_status(),
466 409
467 );
468 assert_eq!(ApplyError::ConcurrentReconfig.http_status(), 409);
469 assert_eq!(ApplyError::DhtRestartFailed("d".into()).http_status(), 500);
470 assert_eq!(ApplyError::LsdRestartFailed("l".into()).http_status(), 500);
471 assert_eq!(ApplyError::NatRefreshFailed("n".into()).http_status(), 200);
472 assert_eq!(ApplyError::Io("i".into()).http_status(), 500);
473 }
474
475 #[test]
476 fn apply_error_is_fatal_returns_false_in_b1() {
477 // B1 ships always-false; M174+ will introduce session-degraded mode.
478 assert!(!ApplyError::ValidationFailed("x".into()).is_fatal());
479 assert!(!ApplyError::ConcurrentReconfig.is_fatal());
480 }
481}