Skip to main content

reddb_server/replication/
rollback.rs

1//! Auto-rollback of a deposed primary to the common point (issue #840,
2//! PRD #819, ADR 0030).
3//!
4//! When a former primary rejoins after a failover still holding writes
5//! above the point its log last agreed with the new primary — a
6//! *divergent tail* — it must drop that tail to rejoin a single timeline.
7//! The tail is, by definition, **non-committed**: it sits above the
8//! commit watermark (the highest LSN durably replicated to a quorum), so
9//! removing it from the live timeline is correct (ADR 0030,
10//! `NeverRollbackCommitted`).
11//!
12//! This module is the *recover-to-LSN* mechanism that does that drop:
13//!
14//! 1. **Plan & guard the boundary.** The recover target is the *common
15//!    point* — the LSN up to which the deposed primary's log still agrees
16//!    with the new primary (produced by the election, #834). The hard
17//!    invariant is that the common point is **at or above the commit
18//!    watermark** (#822): nothing at or below the watermark is ever rolled
19//!    back. If the common point is below the watermark, the coordinator
20//!    **refuses** to roll back rather than destroy committed data.
21//! 2. **Preserve the tail.** Read the divergent tail and persist it to a
22//!    rollback file *before* anything is removed. Rollback is never
23//!    silent: if the tail cannot be persisted, the recovery aborts and no
24//!    data is dropped.
25//! 3. **Recover-to-LSN.** Roll the live timeline back to the common point
26//!    over the MVCC history store (ADR 0014), discarding the tail's
27//!    versions and restoring the pre-images visible at the common point.
28//! 4. **Surface a loud operator event** so the discarded writes stay
29//!    auditable and reconcilable.
30//! 5. **Rejoin as a replica** of the new primary under the new term.
31//!
32//! ## Module shape
33//!
34//! [`RollbackCoordinator::run`] is a pure state machine. The boundary
35//! math ([`RollbackPlan::compute`]) is separated out so the invariant can
36//! be asserted in isolation. Every side effect — reading the tail,
37//! writing the rollback file, the MVCC recover-to-LSN, the operator
38//! event, the role swap — is injected behind [`RollbackTransport`], so
39//! the whole flow runs deterministically against a scripted fake with no
40//! engine, disk, clock, or network dependency. Wiring the transport onto
41//! the real MVCC history store and the gRPC role-swap belongs to the
42//! transport layer once the election (#834) and stale-term fencing (#835)
43//! are live; this slice builds and proves the mechanism in isolation.
44
45use super::failover::NodeRole;
46
47/// A single record from the divergent tail that is about to be discarded.
48///
49/// Carries enough to reconstruct the write for an operator who later
50/// reconciles a rollback file: its LSN, the term that produced it, and the
51/// opaque record payload.
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub struct TailRecord {
54    /// LSN of this record on the deposed primary's local timeline.
55    pub lsn: u64,
56    /// The replication term under which the deposed primary wrote it.
57    pub term: u64,
58    /// Opaque encoded record bytes, preserved verbatim in the rollback
59    /// file.
60    pub payload: Vec<u8>,
61}
62
63impl TailRecord {
64    pub fn new(lsn: u64, term: u64, payload: impl Into<Vec<u8>>) -> Self {
65        Self {
66            lsn,
67            term,
68            payload: payload.into(),
69        }
70    }
71}
72
73/// The divergent tail removed from the live timeline: the records in
74/// `(common_point_lsn, to_lsn]` that never reached quorum.
75#[derive(Debug, Clone, PartialEq, Eq)]
76pub struct DivergentTail {
77    /// The common point — exclusive lower bound. Records at or below this
78    /// LSN are kept; this is the recover-to-LSN target.
79    pub common_point_lsn: u64,
80    /// Inclusive upper bound — the deposed primary's local frontier.
81    pub to_lsn: u64,
82    /// The tail records, in LSN order. May be shorter than the LSN span
83    /// (e.g. sparse / coalesced records); the span is authoritative for
84    /// the boundary, the records are what gets preserved.
85    pub records: Vec<TailRecord>,
86}
87
88impl DivergentTail {
89    /// Number of LSNs removed from the live timeline.
90    pub fn span_lsns(&self) -> u64 {
91        self.to_lsn.saturating_sub(self.common_point_lsn)
92    }
93}
94
95/// The computed, side-effect-free rollback plan. Splitting this out lets
96/// the boundary invariant be asserted without driving any transport.
97#[derive(Debug, Clone, PartialEq, Eq)]
98pub struct RollbackPlan {
99    /// The recover-to-LSN target: the common point with the new primary.
100    pub recover_to_lsn: u64,
101    /// The deposed primary's local frontier (inclusive tail upper bound).
102    pub local_frontier: u64,
103    /// The commit watermark — the durable floor that bounds the recover
104    /// target from below.
105    pub commit_watermark: u64,
106    /// Number of LSNs the tail spans (`local_frontier - recover_to_lsn`).
107    pub tail_lsns: u64,
108}
109
110impl RollbackPlan {
111    /// Compute and validate the rollback plan for `req`.
112    ///
113    /// Enforces the hard invariant: the recover target (common point)
114    /// must be **at or above** the commit watermark, so nothing at or
115    /// below the watermark is ever rolled back. A common point below the
116    /// watermark means the election handed us a target that would discard
117    /// committed data — a contract violation we refuse rather than honour.
118    pub fn compute(req: &RollbackRequest) -> Result<Self, RollbackError> {
119        if req.common_point < req.commit_watermark {
120            return Err(RollbackError::WatermarkViolation {
121                common_point: req.common_point,
122                commit_watermark: req.commit_watermark,
123            });
124        }
125        Ok(Self {
126            recover_to_lsn: req.common_point,
127            local_frontier: req.local_frontier,
128            commit_watermark: req.commit_watermark,
129            tail_lsns: req.local_frontier.saturating_sub(req.common_point),
130        })
131    }
132
133    /// Whether there is a divergent tail to roll back. When the local
134    /// frontier is at or below the common point the node is already on the
135    /// shared timeline and only needs to rejoin.
136    pub fn has_divergent_tail(&self) -> bool {
137        self.local_frontier > self.recover_to_lsn
138    }
139}
140
141/// A request to auto-rollback a deposed primary to the common point and
142/// rejoin it as a replica.
143#[derive(Debug, Clone, PartialEq, Eq)]
144pub struct RollbackRequest {
145    /// The deposed primary's highest local LSN (tail upper bound).
146    pub local_frontier: u64,
147    /// The common point with the new primary — the recover-to-LSN target,
148    /// produced by the election (#834).
149    pub common_point: u64,
150    /// The commit watermark — the highest LSN durably replicated to a
151    /// quorum (#822). The recover target may never fall below this.
152    pub commit_watermark: u64,
153    /// Dial address of the new primary the node rejoins as a replica of.
154    pub new_primary_addr: String,
155    /// The term the new primary serves; the rejoining replica follows it.
156    pub new_term: u64,
157}
158
159/// The loud operator event payload describing a completed rollback,
160/// handed to [`RollbackTransport::emit_rollback_event`]. Mirrors
161/// [`crate::telemetry::operator_event::OperatorEvent::DeposedPrimaryRollback`]
162/// so the production transport can forward it verbatim while a test
163/// transport can capture it.
164#[derive(Debug, Clone, PartialEq, Eq)]
165pub struct RollbackEvent {
166    pub common_point_lsn: u64,
167    pub tail_to_lsn: u64,
168    pub tail_lsns: u64,
169    pub commit_watermark: u64,
170    pub rollback_file: String,
171    pub new_primary_addr: String,
172    pub new_term: u64,
173}
174
175/// The result of a completed rejoin.
176#[derive(Debug, Clone, PartialEq, Eq)]
177pub struct RollbackOutcome {
178    /// The LSN the node recovered to — the common point.
179    pub recovered_to_lsn: u64,
180    /// Number of LSNs removed from the live timeline (`0` when there was
181    /// no divergent tail).
182    pub tail_lsns: u64,
183    /// Where the discarded tail was preserved. `None` only when there was
184    /// no tail to preserve.
185    pub rollback_file: Option<String>,
186    /// Whether the loud operator event fired. Always `true` when a tail
187    /// was discarded; `false` for a clean rejoin with no tail.
188    pub event_fired: bool,
189    /// The role the node now plays — a replica of the new primary under
190    /// the new term.
191    pub role: NodeRole,
192}
193
194impl RollbackOutcome {
195    /// True when a divergent tail was actually rolled back (as opposed to
196    /// a clean rejoin with nothing to discard).
197    pub fn rolled_back_tail(&self) -> bool {
198        self.tail_lsns > 0
199    }
200}
201
202/// Why an auto-rollback could not complete.
203#[derive(Debug, Clone, PartialEq, Eq)]
204pub enum RollbackError {
205    /// The common point is below the commit watermark, so recovering to it
206    /// would roll back committed data. Refused — nothing was changed. This
207    /// should never happen given the election vote rule (ADR 0030); if it
208    /// does, the cluster has a deeper consistency fault that needs an
209    /// operator, not a silent data loss.
210    WatermarkViolation {
211        common_point: u64,
212        commit_watermark: u64,
213    },
214    /// The divergent tail could not be persisted to a rollback file.
215    /// Recovery aborted **before** removing anything: rollback is never
216    /// silent, so if the tail cannot be preserved it is not discarded.
217    TailPersistFailed { reason: String },
218    /// The recover-to-LSN over the MVCC history store failed. The tail was
219    /// already preserved to a rollback file, but the live timeline was not
220    /// rolled back; the node must not rejoin until an operator resolves it.
221    RecoverFailed { target_lsn: u64, reason: String },
222}
223
224impl std::fmt::Display for RollbackError {
225    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226        match self {
227            RollbackError::WatermarkViolation {
228                common_point,
229                commit_watermark,
230            } => write!(
231                f,
232                "auto-rollback refused: common point {common_point} is below the commit watermark \
233                 {commit_watermark}; recovering to it would roll back committed data",
234            ),
235            RollbackError::TailPersistFailed { reason } => write!(
236                f,
237                "auto-rollback aborted: could not persist divergent tail to a rollback file \
238                 ({reason}); nothing was rolled back",
239            ),
240            RollbackError::RecoverFailed { target_lsn, reason } => write!(
241                f,
242                "auto-rollback failed: recover-to-LSN {target_lsn} over the MVCC history store \
243                 failed ({reason}); the divergent tail was preserved but the timeline was not \
244                 rolled back",
245            ),
246        }
247    }
248}
249
250impl std::error::Error for RollbackError {}
251
252/// Side effects the rollback coordinator drives, injected so the state
253/// machine stays pure and deterministically testable.
254///
255/// Production implementors back these onto the MVCC history store
256/// (ADR 0014) for the recover-to-LSN, the rollback-file writer, the
257/// [`crate::telemetry::operator_event::OperatorEvent`] bus, and the gRPC
258/// role-swap. Tests back them onto a scripted fake.
259pub trait RollbackTransport {
260    /// Read the divergent tail records in `(from_exclusive, to_inclusive]`
261    /// from the local timeline / MVCC history store, in LSN order.
262    fn read_divergent_tail(&mut self, from_exclusive: u64, to_inclusive: u64) -> Vec<TailRecord>;
263
264    /// Persist the divergent tail to a durable rollback file and return a
265    /// path/handle that identifies it. Returning `Err` aborts the
266    /// rollback **before** any data is removed — rollback is never silent.
267    fn persist_rollback_file(&mut self, tail: &DivergentTail) -> Result<String, String>;
268
269    /// Recover the live timeline to `target_lsn` over the MVCC history
270    /// store, discarding every version above it and restoring the
271    /// pre-images visible at `target_lsn`.
272    fn recover_to_lsn(&mut self, target_lsn: u64) -> Result<(), String>;
273
274    /// Emit the loud, auditable operator event for the completed rollback.
275    fn emit_rollback_event(&mut self, event: RollbackEvent);
276
277    /// Reconfigure the node to stream as a replica of `primary_addr` under
278    /// `term`.
279    fn rejoin_as_replica(&mut self, primary_addr: &str, term: u64);
280}
281
282/// The deposed-primary auto-rollback state machine.
283pub struct RollbackCoordinator;
284
285impl RollbackCoordinator {
286    /// Execute the auto-rollback described by `req`, driving the node
287    /// through `tx`.
288    ///
289    /// Ordering is chosen so the hard guarantees hold even on partial
290    /// failure:
291    ///
292    /// 1. Compute & guard the boundary — refuse if it would cross the
293    ///    watermark, changing nothing.
294    /// 2. If there is no divergent tail, just rejoin.
295    /// 3. Read and **persist** the tail before removing anything; abort if
296    ///    it cannot be preserved.
297    /// 4. Recover-to-LSN to the common point.
298    /// 5. Fire the loud operator event.
299    /// 6. Rejoin as a replica of the new primary.
300    pub fn run(
301        req: &RollbackRequest,
302        tx: &mut dyn RollbackTransport,
303    ) -> Result<RollbackOutcome, RollbackError> {
304        let plan = RollbackPlan::compute(req)?;
305
306        let role = NodeRole::Replica {
307            primary_addr: req.new_primary_addr.clone(),
308            term: req.new_term,
309        };
310
311        // No divergent tail: the node is already on the shared timeline.
312        // Just rejoin — nothing to preserve, nothing to roll back, no
313        // operator event.
314        if !plan.has_divergent_tail() {
315            tx.rejoin_as_replica(&req.new_primary_addr, req.new_term);
316            return Ok(RollbackOutcome {
317                recovered_to_lsn: plan.recover_to_lsn,
318                tail_lsns: 0,
319                rollback_file: None,
320                event_fired: false,
321                role,
322            });
323        }
324
325        // Read the tail and preserve it BEFORE removing anything. If we
326        // cannot persist it, abort without rolling back — rollback is
327        // never silent.
328        let records = tx.read_divergent_tail(plan.recover_to_lsn, plan.local_frontier);
329        let tail = DivergentTail {
330            common_point_lsn: plan.recover_to_lsn,
331            to_lsn: plan.local_frontier,
332            records,
333        };
334        let rollback_file = tx
335            .persist_rollback_file(&tail)
336            .map_err(|reason| RollbackError::TailPersistFailed { reason })?;
337
338        // Recover the live timeline to the common point over the MVCC
339        // history store. The tail is already safe in the rollback file.
340        tx.recover_to_lsn(plan.recover_to_lsn)
341            .map_err(|reason| RollbackError::RecoverFailed {
342                target_lsn: plan.recover_to_lsn,
343                reason,
344            })?;
345
346        // Surface the discarded writes loudly so they stay auditable.
347        tx.emit_rollback_event(RollbackEvent {
348            common_point_lsn: plan.recover_to_lsn,
349            tail_to_lsn: plan.local_frontier,
350            tail_lsns: plan.tail_lsns,
351            commit_watermark: plan.commit_watermark,
352            rollback_file: rollback_file.clone(),
353            new_primary_addr: req.new_primary_addr.clone(),
354            new_term: req.new_term,
355        });
356
357        // Rejoin as a replica of the new primary under the new term.
358        tx.rejoin_as_replica(&req.new_primary_addr, req.new_term);
359
360        Ok(RollbackOutcome {
361            recovered_to_lsn: plan.recover_to_lsn,
362            tail_lsns: plan.tail_lsns,
363            rollback_file: Some(rollback_file),
364            event_fired: true,
365            role,
366        })
367    }
368}
369
370#[cfg(test)]
371mod tests {
372    use super::*;
373
374    /// A scripted fake recording every side effect so tests can assert
375    /// ordering and content. `persist_should_fail` / `recover_should_fail`
376    /// drive the abort paths.
377    struct FakeTransport {
378        /// Records the fake hands back from `read_divergent_tail`.
379        available_tail: Vec<TailRecord>,
380        persist_should_fail: bool,
381        recover_should_fail: bool,
382        // Captured effects, in order.
383        persisted: Option<DivergentTail>,
384        recovered_to: Option<u64>,
385        emitted: Option<RollbackEvent>,
386        rejoined: Option<(String, u64)>,
387        /// Order log of effect names, to assert preserve-before-recover.
388        order: Vec<&'static str>,
389    }
390
391    impl FakeTransport {
392        fn new(available_tail: Vec<TailRecord>) -> Self {
393            Self {
394                available_tail,
395                persist_should_fail: false,
396                recover_should_fail: false,
397                persisted: None,
398                recovered_to: None,
399                emitted: None,
400                rejoined: None,
401                order: Vec::new(),
402            }
403        }
404    }
405
406    impl RollbackTransport for FakeTransport {
407        fn read_divergent_tail(
408            &mut self,
409            from_exclusive: u64,
410            to_inclusive: u64,
411        ) -> Vec<TailRecord> {
412            self.order.push("read");
413            self.available_tail
414                .iter()
415                .filter(|r| r.lsn > from_exclusive && r.lsn <= to_inclusive)
416                .cloned()
417                .collect()
418        }
419
420        fn persist_rollback_file(&mut self, tail: &DivergentTail) -> Result<String, String> {
421            self.order.push("persist");
422            if self.persist_should_fail {
423                return Err("disk full".to_string());
424            }
425            self.persisted = Some(tail.clone());
426            Ok(format!(
427                "/data/rollback/lsn-{}-{}.rbk",
428                tail.common_point_lsn, tail.to_lsn
429            ))
430        }
431
432        fn recover_to_lsn(&mut self, target_lsn: u64) -> Result<(), String> {
433            self.order.push("recover");
434            if self.recover_should_fail {
435                return Err("history truncated".to_string());
436            }
437            self.recovered_to = Some(target_lsn);
438            Ok(())
439        }
440
441        fn emit_rollback_event(&mut self, event: RollbackEvent) {
442            self.order.push("emit");
443            self.emitted = Some(event);
444        }
445
446        fn rejoin_as_replica(&mut self, primary_addr: &str, term: u64) {
447            self.order.push("rejoin");
448            self.rejoined = Some((primary_addr.to_string(), term));
449        }
450    }
451
452    fn request(local_frontier: u64, common_point: u64, watermark: u64) -> RollbackRequest {
453        RollbackRequest {
454            local_frontier,
455            common_point,
456            commit_watermark: watermark,
457            new_primary_addr: "http://node-b:50051".to_string(),
458            new_term: 8,
459        }
460    }
461
462    fn tail(lsns: &[u64], term: u64) -> Vec<TailRecord> {
463        lsns.iter()
464            .map(|lsn| TailRecord::new(*lsn, term, vec![*lsn as u8]))
465            .collect()
466    }
467
468    // ------------------------------------------------------------------
469    // Boundary math (pure plan)
470    // ------------------------------------------------------------------
471
472    #[test]
473    fn plan_recovers_to_common_point_and_sizes_the_tail() {
474        let plan = RollbackPlan::compute(&request(230, 200, 200)).expect("valid plan");
475        assert_eq!(
476            plan.recover_to_lsn, 200,
477            "recover target is the common point"
478        );
479        assert_eq!(plan.tail_lsns, 30, "tail spans common_point..frontier");
480        assert!(plan.has_divergent_tail());
481    }
482
483    #[test]
484    fn plan_with_common_point_above_watermark_is_allowed() {
485        // The common point may sit ABOVE the watermark — the deposed
486        // primary agreed with the new primary past the durable floor.
487        // Only what is above the common point is rolled back.
488        let plan = RollbackPlan::compute(&request(300, 250, 200)).expect("valid plan");
489        assert_eq!(plan.recover_to_lsn, 250);
490        assert_eq!(plan.tail_lsns, 50);
491    }
492
493    #[test]
494    fn plan_refuses_common_point_below_watermark() {
495        // HARD INVARIANT: nothing at or below the commit watermark is ever
496        // rolled back. A common point below the watermark would do exactly
497        // that, so the plan is refused.
498        let err = RollbackPlan::compute(&request(300, 150, 200)).expect_err("must refuse");
499        assert_eq!(
500            err,
501            RollbackError::WatermarkViolation {
502                common_point: 150,
503                commit_watermark: 200,
504            }
505        );
506    }
507
508    #[test]
509    fn plan_at_watermark_is_the_inclusive_floor() {
510        // common_point == watermark is allowed: the watermark itself is
511        // kept, only strictly-above records are rolled back.
512        let plan = RollbackPlan::compute(&request(220, 200, 200)).expect("valid at floor");
513        assert_eq!(plan.recover_to_lsn, 200);
514        assert_eq!(plan.tail_lsns, 20);
515    }
516
517    // ------------------------------------------------------------------
518    // Full run: happy path
519    // ------------------------------------------------------------------
520
521    #[test]
522    fn run_preserves_tail_then_recovers_then_emits_then_rejoins() {
523        let mut tx = FakeTransport::new(tail(&[201, 210, 230], 7));
524        let outcome =
525            RollbackCoordinator::run(&request(230, 200, 200), &mut tx).expect("rollback succeeds");
526
527        // Boundary: recovered to the common point, tail sized correctly.
528        assert_eq!(outcome.recovered_to_lsn, 200);
529        assert_eq!(outcome.tail_lsns, 30);
530        assert!(outcome.rolled_back_tail());
531
532        // Tail preserved: the rollback file holds exactly the records
533        // above the common point.
534        let persisted = tx.persisted.as_ref().expect("tail persisted");
535        assert_eq!(persisted.common_point_lsn, 200);
536        assert_eq!(persisted.to_lsn, 230);
537        assert_eq!(persisted.records, tail(&[201, 210, 230], 7));
538        assert_eq!(
539            outcome.rollback_file.as_deref(),
540            Some("/data/rollback/lsn-200-230.rbk")
541        );
542
543        // Recover-to-LSN hit the common point.
544        assert_eq!(tx.recovered_to, Some(200));
545
546        // Loud operator event fired with the boundary + file.
547        assert!(outcome.event_fired);
548        let ev = tx.emitted.as_ref().expect("event emitted");
549        assert_eq!(ev.common_point_lsn, 200);
550        assert_eq!(ev.tail_to_lsn, 230);
551        assert_eq!(ev.tail_lsns, 30);
552        assert_eq!(ev.commit_watermark, 200);
553        assert_eq!(ev.rollback_file, "/data/rollback/lsn-200-230.rbk");
554        assert_eq!(ev.new_term, 8);
555
556        // Rejoined as a replica of the new primary under the new term.
557        assert_eq!(tx.rejoined, Some(("http://node-b:50051".to_string(), 8)));
558        assert_eq!(
559            outcome.role,
560            NodeRole::Replica {
561                primary_addr: "http://node-b:50051".to_string(),
562                term: 8,
563            }
564        );
565
566        // Critical ordering: tail is preserved BEFORE the timeline is
567        // recovered, and the event fires before rejoin.
568        assert_eq!(
569            tx.order,
570            vec!["read", "persist", "recover", "emit", "rejoin"]
571        );
572    }
573
574    // ------------------------------------------------------------------
575    // Full run: no divergent tail → clean rejoin, no rollback, no event
576    // ------------------------------------------------------------------
577
578    #[test]
579    fn run_with_no_tail_just_rejoins() {
580        // Frontier == common point: nothing diverged.
581        let mut tx = FakeTransport::new(vec![]);
582        let outcome =
583            RollbackCoordinator::run(&request(200, 200, 200), &mut tx).expect("clean rejoin");
584
585        assert_eq!(outcome.tail_lsns, 0);
586        assert!(!outcome.rolled_back_tail());
587        assert!(!outcome.event_fired, "no event when nothing is discarded");
588        assert_eq!(outcome.rollback_file, None);
589        assert!(tx.persisted.is_none(), "nothing persisted");
590        assert!(tx.recovered_to.is_none(), "nothing recovered");
591        assert!(tx.emitted.is_none(), "no operator event");
592        assert_eq!(tx.rejoined, Some(("http://node-b:50051".to_string(), 8)));
593        assert_eq!(tx.order, vec!["rejoin"]);
594    }
595
596    #[test]
597    fn run_with_frontier_below_common_point_is_a_clean_rejoin() {
598        // A node strictly behind the common point has no divergent tail;
599        // it just streams forward as a replica.
600        let mut tx = FakeTransport::new(vec![]);
601        let outcome =
602            RollbackCoordinator::run(&request(180, 200, 150), &mut tx).expect("clean rejoin");
603        assert_eq!(outcome.recovered_to_lsn, 200);
604        assert_eq!(outcome.tail_lsns, 0);
605        assert!(!outcome.event_fired);
606        assert_eq!(tx.order, vec!["rejoin"]);
607    }
608
609    // ------------------------------------------------------------------
610    // Full run: refusal & abort paths
611    // ------------------------------------------------------------------
612
613    #[test]
614    fn run_refuses_when_common_point_below_watermark_and_touches_nothing() {
615        let mut tx = FakeTransport::new(tail(&[160, 200, 300], 7));
616        let err = RollbackCoordinator::run(&request(300, 150, 200), &mut tx)
617            .expect_err("must refuse to cross the watermark");
618        assert!(matches!(err, RollbackError::WatermarkViolation { .. }));
619
620        // Nothing was touched: no read, no persist, no recover, no rejoin.
621        assert!(tx.persisted.is_none());
622        assert!(tx.recovered_to.is_none());
623        assert!(tx.emitted.is_none());
624        assert!(tx.rejoined.is_none());
625        assert!(tx.order.is_empty());
626    }
627
628    #[test]
629    fn run_aborts_without_recovering_when_tail_cannot_be_persisted() {
630        // Rollback is never silent: if the tail cannot be saved, the
631        // timeline is NOT rolled back.
632        let mut tx = FakeTransport::new(tail(&[210, 230], 7));
633        tx.persist_should_fail = true;
634        let err = RollbackCoordinator::run(&request(230, 200, 200), &mut tx)
635            .expect_err("must abort when persist fails");
636        assert!(matches!(err, RollbackError::TailPersistFailed { .. }));
637
638        // Read + attempted persist happened; recover/emit/rejoin did NOT.
639        assert!(tx.recovered_to.is_none(), "must not roll back the timeline");
640        assert!(tx.emitted.is_none());
641        assert!(tx.rejoined.is_none());
642        assert_eq!(tx.order, vec!["read", "persist"]);
643    }
644
645    #[test]
646    fn run_surfaces_recover_failure_after_preserving_the_tail() {
647        let mut tx = FakeTransport::new(tail(&[210, 230], 7));
648        tx.recover_should_fail = true;
649        let err = RollbackCoordinator::run(&request(230, 200, 200), &mut tx)
650            .expect_err("recover failure surfaces");
651        match err {
652            RollbackError::RecoverFailed { target_lsn, .. } => assert_eq!(target_lsn, 200),
653            other => panic!("expected RecoverFailed, got {other:?}"),
654        }
655
656        // The tail was preserved before the failed recover, so the writes
657        // are not lost; but the node did not rejoin on a half-rolled state.
658        assert!(tx.persisted.is_some(), "tail preserved before recover");
659        assert!(
660            tx.emitted.is_none(),
661            "no completion event on failed recover"
662        );
663        assert!(
664            tx.rejoined.is_none(),
665            "must not rejoin after a failed recover"
666        );
667        assert_eq!(tx.order, vec!["read", "persist", "recover"]);
668    }
669
670    #[test]
671    fn span_lsns_counts_the_removed_range() {
672        let t = DivergentTail {
673            common_point_lsn: 200,
674            to_lsn: 230,
675            records: tail(&[210, 230], 7),
676        };
677        assert_eq!(t.span_lsns(), 30);
678    }
679}