reddb_server/replication/rollback.rs
1//! Auto-rollback of a deposed primary to the common point (issue #840,
2//! PRD #819, ADR 0030).
3//!
4//! When a former primary rejoins after a failover still holding writes
5//! above the point its log last agreed with the new primary — a
6//! *divergent tail* — it must drop that tail to rejoin a single timeline.
7//! The tail is, by definition, **non-committed**: it sits above the
8//! commit watermark (the highest LSN durably replicated to a quorum), so
9//! removing it from the live timeline is correct (ADR 0030,
10//! `NeverRollbackCommitted`).
11//!
12//! This module is the *recover-to-LSN* mechanism that does that drop:
13//!
14//! 1. **Plan & guard the boundary.** The recover target is the *common
15//! point* — the LSN up to which the deposed primary's log still agrees
16//! with the new primary (produced by the election, #834). The hard
17//! invariant is that the common point is **at or above the commit
18//! watermark** (#822): nothing at or below the watermark is ever rolled
19//! back. If the common point is below the watermark, the coordinator
20//! **refuses** to roll back rather than destroy committed data.
21//! 2. **Preserve the tail.** Read the divergent tail and persist it to a
22//! rollback file *before* anything is removed. Rollback is never
23//! silent: if the tail cannot be persisted, the recovery aborts and no
24//! data is dropped.
25//! 3. **Recover-to-LSN.** Roll the live timeline back to the common point
26//! over the MVCC history store (ADR 0014), discarding the tail's
27//! versions and restoring the pre-images visible at the common point.
28//! 4. **Surface a loud operator event** so the discarded writes stay
29//! auditable and reconcilable.
30//! 5. **Rejoin as a replica** of the new primary under the new term.
31//!
32//! ## Module shape
33//!
34//! [`RollbackCoordinator::run`] is a pure state machine. The boundary
35//! math ([`RollbackPlan::compute`]) is separated out so the invariant can
36//! be asserted in isolation. Every side effect — reading the tail,
37//! writing the rollback file, the MVCC recover-to-LSN, the operator
38//! event, the role swap — is injected behind [`RollbackTransport`], so
39//! the whole flow runs deterministically against a scripted fake with no
40//! engine, disk, clock, or network dependency. Wiring the transport onto
41//! the real MVCC history store and the gRPC role-swap belongs to the
42//! transport layer once the election (#834) and stale-term fencing (#835)
43//! are live; this slice builds and proves the mechanism in isolation.
44
45use super::failover::NodeRole;
46
47/// A single record from the divergent tail that is about to be discarded.
48///
49/// Carries enough to reconstruct the write for an operator who later
50/// reconciles a rollback file: its LSN, the term that produced it, and the
51/// opaque record payload.
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub struct TailRecord {
54 /// LSN of this record on the deposed primary's local timeline.
55 pub lsn: u64,
56 /// The replication term under which the deposed primary wrote it.
57 pub term: u64,
58 /// Opaque encoded record bytes, preserved verbatim in the rollback
59 /// file.
60 pub payload: Vec<u8>,
61}
62
63impl TailRecord {
64 pub fn new(lsn: u64, term: u64, payload: impl Into<Vec<u8>>) -> Self {
65 Self {
66 lsn,
67 term,
68 payload: payload.into(),
69 }
70 }
71}
72
73/// The divergent tail removed from the live timeline: the records in
74/// `(common_point_lsn, to_lsn]` that never reached quorum.
75#[derive(Debug, Clone, PartialEq, Eq)]
76pub struct DivergentTail {
77 /// The common point — exclusive lower bound. Records at or below this
78 /// LSN are kept; this is the recover-to-LSN target.
79 pub common_point_lsn: u64,
80 /// Inclusive upper bound — the deposed primary's local frontier.
81 pub to_lsn: u64,
82 /// The tail records, in LSN order. May be shorter than the LSN span
83 /// (e.g. sparse / coalesced records); the span is authoritative for
84 /// the boundary, the records are what gets preserved.
85 pub records: Vec<TailRecord>,
86}
87
88impl DivergentTail {
89 /// Number of LSNs removed from the live timeline.
90 pub fn span_lsns(&self) -> u64 {
91 self.to_lsn.saturating_sub(self.common_point_lsn)
92 }
93}
94
95/// The computed, side-effect-free rollback plan. Splitting this out lets
96/// the boundary invariant be asserted without driving any transport.
97#[derive(Debug, Clone, PartialEq, Eq)]
98pub struct RollbackPlan {
99 /// The recover-to-LSN target: the common point with the new primary.
100 pub recover_to_lsn: u64,
101 /// The deposed primary's local frontier (inclusive tail upper bound).
102 pub local_frontier: u64,
103 /// The commit watermark — the durable floor that bounds the recover
104 /// target from below.
105 pub commit_watermark: u64,
106 /// Number of LSNs the tail spans (`local_frontier - recover_to_lsn`).
107 pub tail_lsns: u64,
108}
109
110impl RollbackPlan {
111 /// Compute and validate the rollback plan for `req`.
112 ///
113 /// Enforces the hard invariant: the recover target (common point)
114 /// must be **at or above** the commit watermark, so nothing at or
115 /// below the watermark is ever rolled back. A common point below the
116 /// watermark means the election handed us a target that would discard
117 /// committed data — a contract violation we refuse rather than honour.
118 pub fn compute(req: &RollbackRequest) -> Result<Self, RollbackError> {
119 if req.common_point < req.commit_watermark {
120 return Err(RollbackError::WatermarkViolation {
121 common_point: req.common_point,
122 commit_watermark: req.commit_watermark,
123 });
124 }
125 Ok(Self {
126 recover_to_lsn: req.common_point,
127 local_frontier: req.local_frontier,
128 commit_watermark: req.commit_watermark,
129 tail_lsns: req.local_frontier.saturating_sub(req.common_point),
130 })
131 }
132
133 /// Whether there is a divergent tail to roll back. When the local
134 /// frontier is at or below the common point the node is already on the
135 /// shared timeline and only needs to rejoin.
136 pub fn has_divergent_tail(&self) -> bool {
137 self.local_frontier > self.recover_to_lsn
138 }
139}
140
141/// A request to auto-rollback a deposed primary to the common point and
142/// rejoin it as a replica.
143#[derive(Debug, Clone, PartialEq, Eq)]
144pub struct RollbackRequest {
145 /// The deposed primary's highest local LSN (tail upper bound).
146 pub local_frontier: u64,
147 /// The common point with the new primary — the recover-to-LSN target,
148 /// produced by the election (#834).
149 pub common_point: u64,
150 /// The commit watermark — the highest LSN durably replicated to a
151 /// quorum (#822). The recover target may never fall below this.
152 pub commit_watermark: u64,
153 /// Dial address of the new primary the node rejoins as a replica of.
154 pub new_primary_addr: String,
155 /// The term the new primary serves; the rejoining replica follows it.
156 pub new_term: u64,
157}
158
159/// The loud operator event payload describing a completed rollback,
160/// handed to [`RollbackTransport::emit_rollback_event`]. Mirrors
161/// [`crate::telemetry::operator_event::OperatorEvent::DeposedPrimaryRollback`]
162/// so the production transport can forward it verbatim while a test
163/// transport can capture it.
164#[derive(Debug, Clone, PartialEq, Eq)]
165pub struct RollbackEvent {
166 pub common_point_lsn: u64,
167 pub tail_to_lsn: u64,
168 pub tail_lsns: u64,
169 pub commit_watermark: u64,
170 pub rollback_file: String,
171 pub new_primary_addr: String,
172 pub new_term: u64,
173}
174
175/// The result of a completed rejoin.
176#[derive(Debug, Clone, PartialEq, Eq)]
177pub struct RollbackOutcome {
178 /// The LSN the node recovered to — the common point.
179 pub recovered_to_lsn: u64,
180 /// Number of LSNs removed from the live timeline (`0` when there was
181 /// no divergent tail).
182 pub tail_lsns: u64,
183 /// Where the discarded tail was preserved. `None` only when there was
184 /// no tail to preserve.
185 pub rollback_file: Option<String>,
186 /// Whether the loud operator event fired. Always `true` when a tail
187 /// was discarded; `false` for a clean rejoin with no tail.
188 pub event_fired: bool,
189 /// The role the node now plays — a replica of the new primary under
190 /// the new term.
191 pub role: NodeRole,
192}
193
194impl RollbackOutcome {
195 /// True when a divergent tail was actually rolled back (as opposed to
196 /// a clean rejoin with nothing to discard).
197 pub fn rolled_back_tail(&self) -> bool {
198 self.tail_lsns > 0
199 }
200}
201
202/// Why an auto-rollback could not complete.
203#[derive(Debug, Clone, PartialEq, Eq)]
204pub enum RollbackError {
205 /// The common point is below the commit watermark, so recovering to it
206 /// would roll back committed data. Refused — nothing was changed. This
207 /// should never happen given the election vote rule (ADR 0030); if it
208 /// does, the cluster has a deeper consistency fault that needs an
209 /// operator, not a silent data loss.
210 WatermarkViolation {
211 common_point: u64,
212 commit_watermark: u64,
213 },
214 /// The divergent tail could not be persisted to a rollback file.
215 /// Recovery aborted **before** removing anything: rollback is never
216 /// silent, so if the tail cannot be preserved it is not discarded.
217 TailPersistFailed { reason: String },
218 /// The recover-to-LSN over the MVCC history store failed. The tail was
219 /// already preserved to a rollback file, but the live timeline was not
220 /// rolled back; the node must not rejoin until an operator resolves it.
221 RecoverFailed { target_lsn: u64, reason: String },
222}
223
224impl std::fmt::Display for RollbackError {
225 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226 match self {
227 RollbackError::WatermarkViolation {
228 common_point,
229 commit_watermark,
230 } => write!(
231 f,
232 "auto-rollback refused: common point {common_point} is below the commit watermark \
233 {commit_watermark}; recovering to it would roll back committed data",
234 ),
235 RollbackError::TailPersistFailed { reason } => write!(
236 f,
237 "auto-rollback aborted: could not persist divergent tail to a rollback file \
238 ({reason}); nothing was rolled back",
239 ),
240 RollbackError::RecoverFailed { target_lsn, reason } => write!(
241 f,
242 "auto-rollback failed: recover-to-LSN {target_lsn} over the MVCC history store \
243 failed ({reason}); the divergent tail was preserved but the timeline was not \
244 rolled back",
245 ),
246 }
247 }
248}
249
250impl std::error::Error for RollbackError {}
251
252/// Side effects the rollback coordinator drives, injected so the state
253/// machine stays pure and deterministically testable.
254///
255/// Production implementors back these onto the MVCC history store
256/// (ADR 0014) for the recover-to-LSN, the rollback-file writer, the
257/// [`crate::telemetry::operator_event::OperatorEvent`] bus, and the gRPC
258/// role-swap. Tests back them onto a scripted fake.
259pub trait RollbackTransport {
260 /// Read the divergent tail records in `(from_exclusive, to_inclusive]`
261 /// from the local timeline / MVCC history store, in LSN order.
262 fn read_divergent_tail(&mut self, from_exclusive: u64, to_inclusive: u64) -> Vec<TailRecord>;
263
264 /// Persist the divergent tail to a durable rollback file and return a
265 /// path/handle that identifies it. Returning `Err` aborts the
266 /// rollback **before** any data is removed — rollback is never silent.
267 fn persist_rollback_file(&mut self, tail: &DivergentTail) -> Result<String, String>;
268
269 /// Recover the live timeline to `target_lsn` over the MVCC history
270 /// store, discarding every version above it and restoring the
271 /// pre-images visible at `target_lsn`.
272 fn recover_to_lsn(&mut self, target_lsn: u64) -> Result<(), String>;
273
274 /// Emit the loud, auditable operator event for the completed rollback.
275 fn emit_rollback_event(&mut self, event: RollbackEvent);
276
277 /// Reconfigure the node to stream as a replica of `primary_addr` under
278 /// `term`.
279 fn rejoin_as_replica(&mut self, primary_addr: &str, term: u64);
280}
281
282/// The deposed-primary auto-rollback state machine.
283pub struct RollbackCoordinator;
284
285impl RollbackCoordinator {
286 /// Execute the auto-rollback described by `req`, driving the node
287 /// through `tx`.
288 ///
289 /// Ordering is chosen so the hard guarantees hold even on partial
290 /// failure:
291 ///
292 /// 1. Compute & guard the boundary — refuse if it would cross the
293 /// watermark, changing nothing.
294 /// 2. If there is no divergent tail, just rejoin.
295 /// 3. Read and **persist** the tail before removing anything; abort if
296 /// it cannot be preserved.
297 /// 4. Recover-to-LSN to the common point.
298 /// 5. Fire the loud operator event.
299 /// 6. Rejoin as a replica of the new primary.
300 pub fn run(
301 req: &RollbackRequest,
302 tx: &mut dyn RollbackTransport,
303 ) -> Result<RollbackOutcome, RollbackError> {
304 let plan = RollbackPlan::compute(req)?;
305
306 let role = NodeRole::Replica {
307 primary_addr: req.new_primary_addr.clone(),
308 term: req.new_term,
309 };
310
311 // No divergent tail: the node is already on the shared timeline.
312 // Just rejoin — nothing to preserve, nothing to roll back, no
313 // operator event.
314 if !plan.has_divergent_tail() {
315 tx.rejoin_as_replica(&req.new_primary_addr, req.new_term);
316 return Ok(RollbackOutcome {
317 recovered_to_lsn: plan.recover_to_lsn,
318 tail_lsns: 0,
319 rollback_file: None,
320 event_fired: false,
321 role,
322 });
323 }
324
325 // Read the tail and preserve it BEFORE removing anything. If we
326 // cannot persist it, abort without rolling back — rollback is
327 // never silent.
328 let records = tx.read_divergent_tail(plan.recover_to_lsn, plan.local_frontier);
329 let tail = DivergentTail {
330 common_point_lsn: plan.recover_to_lsn,
331 to_lsn: plan.local_frontier,
332 records,
333 };
334 let rollback_file = tx
335 .persist_rollback_file(&tail)
336 .map_err(|reason| RollbackError::TailPersistFailed { reason })?;
337
338 // Recover the live timeline to the common point over the MVCC
339 // history store. The tail is already safe in the rollback file.
340 tx.recover_to_lsn(plan.recover_to_lsn)
341 .map_err(|reason| RollbackError::RecoverFailed {
342 target_lsn: plan.recover_to_lsn,
343 reason,
344 })?;
345
346 // Surface the discarded writes loudly so they stay auditable.
347 tx.emit_rollback_event(RollbackEvent {
348 common_point_lsn: plan.recover_to_lsn,
349 tail_to_lsn: plan.local_frontier,
350 tail_lsns: plan.tail_lsns,
351 commit_watermark: plan.commit_watermark,
352 rollback_file: rollback_file.clone(),
353 new_primary_addr: req.new_primary_addr.clone(),
354 new_term: req.new_term,
355 });
356
357 // Rejoin as a replica of the new primary under the new term.
358 tx.rejoin_as_replica(&req.new_primary_addr, req.new_term);
359
360 Ok(RollbackOutcome {
361 recovered_to_lsn: plan.recover_to_lsn,
362 tail_lsns: plan.tail_lsns,
363 rollback_file: Some(rollback_file),
364 event_fired: true,
365 role,
366 })
367 }
368}
369
370#[cfg(test)]
371mod tests {
372 use super::*;
373
374 /// A scripted fake recording every side effect so tests can assert
375 /// ordering and content. `persist_should_fail` / `recover_should_fail`
376 /// drive the abort paths.
377 struct FakeTransport {
378 /// Records the fake hands back from `read_divergent_tail`.
379 available_tail: Vec<TailRecord>,
380 persist_should_fail: bool,
381 recover_should_fail: bool,
382 // Captured effects, in order.
383 persisted: Option<DivergentTail>,
384 recovered_to: Option<u64>,
385 emitted: Option<RollbackEvent>,
386 rejoined: Option<(String, u64)>,
387 /// Order log of effect names, to assert preserve-before-recover.
388 order: Vec<&'static str>,
389 }
390
391 impl FakeTransport {
392 fn new(available_tail: Vec<TailRecord>) -> Self {
393 Self {
394 available_tail,
395 persist_should_fail: false,
396 recover_should_fail: false,
397 persisted: None,
398 recovered_to: None,
399 emitted: None,
400 rejoined: None,
401 order: Vec::new(),
402 }
403 }
404 }
405
406 impl RollbackTransport for FakeTransport {
407 fn read_divergent_tail(
408 &mut self,
409 from_exclusive: u64,
410 to_inclusive: u64,
411 ) -> Vec<TailRecord> {
412 self.order.push("read");
413 self.available_tail
414 .iter()
415 .filter(|r| r.lsn > from_exclusive && r.lsn <= to_inclusive)
416 .cloned()
417 .collect()
418 }
419
420 fn persist_rollback_file(&mut self, tail: &DivergentTail) -> Result<String, String> {
421 self.order.push("persist");
422 if self.persist_should_fail {
423 return Err("disk full".to_string());
424 }
425 self.persisted = Some(tail.clone());
426 Ok(format!(
427 "/data/rollback/lsn-{}-{}.rbk",
428 tail.common_point_lsn, tail.to_lsn
429 ))
430 }
431
432 fn recover_to_lsn(&mut self, target_lsn: u64) -> Result<(), String> {
433 self.order.push("recover");
434 if self.recover_should_fail {
435 return Err("history truncated".to_string());
436 }
437 self.recovered_to = Some(target_lsn);
438 Ok(())
439 }
440
441 fn emit_rollback_event(&mut self, event: RollbackEvent) {
442 self.order.push("emit");
443 self.emitted = Some(event);
444 }
445
446 fn rejoin_as_replica(&mut self, primary_addr: &str, term: u64) {
447 self.order.push("rejoin");
448 self.rejoined = Some((primary_addr.to_string(), term));
449 }
450 }
451
452 fn request(local_frontier: u64, common_point: u64, watermark: u64) -> RollbackRequest {
453 RollbackRequest {
454 local_frontier,
455 common_point,
456 commit_watermark: watermark,
457 new_primary_addr: "http://node-b:50051".to_string(),
458 new_term: 8,
459 }
460 }
461
462 fn tail(lsns: &[u64], term: u64) -> Vec<TailRecord> {
463 lsns.iter()
464 .map(|lsn| TailRecord::new(*lsn, term, vec![*lsn as u8]))
465 .collect()
466 }
467
468 // ------------------------------------------------------------------
469 // Boundary math (pure plan)
470 // ------------------------------------------------------------------
471
472 #[test]
473 fn plan_recovers_to_common_point_and_sizes_the_tail() {
474 let plan = RollbackPlan::compute(&request(230, 200, 200)).expect("valid plan");
475 assert_eq!(
476 plan.recover_to_lsn, 200,
477 "recover target is the common point"
478 );
479 assert_eq!(plan.tail_lsns, 30, "tail spans common_point..frontier");
480 assert!(plan.has_divergent_tail());
481 }
482
483 #[test]
484 fn plan_with_common_point_above_watermark_is_allowed() {
485 // The common point may sit ABOVE the watermark — the deposed
486 // primary agreed with the new primary past the durable floor.
487 // Only what is above the common point is rolled back.
488 let plan = RollbackPlan::compute(&request(300, 250, 200)).expect("valid plan");
489 assert_eq!(plan.recover_to_lsn, 250);
490 assert_eq!(plan.tail_lsns, 50);
491 }
492
493 #[test]
494 fn plan_refuses_common_point_below_watermark() {
495 // HARD INVARIANT: nothing at or below the commit watermark is ever
496 // rolled back. A common point below the watermark would do exactly
497 // that, so the plan is refused.
498 let err = RollbackPlan::compute(&request(300, 150, 200)).expect_err("must refuse");
499 assert_eq!(
500 err,
501 RollbackError::WatermarkViolation {
502 common_point: 150,
503 commit_watermark: 200,
504 }
505 );
506 }
507
508 #[test]
509 fn plan_at_watermark_is_the_inclusive_floor() {
510 // common_point == watermark is allowed: the watermark itself is
511 // kept, only strictly-above records are rolled back.
512 let plan = RollbackPlan::compute(&request(220, 200, 200)).expect("valid at floor");
513 assert_eq!(plan.recover_to_lsn, 200);
514 assert_eq!(plan.tail_lsns, 20);
515 }
516
517 // ------------------------------------------------------------------
518 // Full run: happy path
519 // ------------------------------------------------------------------
520
521 #[test]
522 fn run_preserves_tail_then_recovers_then_emits_then_rejoins() {
523 let mut tx = FakeTransport::new(tail(&[201, 210, 230], 7));
524 let outcome =
525 RollbackCoordinator::run(&request(230, 200, 200), &mut tx).expect("rollback succeeds");
526
527 // Boundary: recovered to the common point, tail sized correctly.
528 assert_eq!(outcome.recovered_to_lsn, 200);
529 assert_eq!(outcome.tail_lsns, 30);
530 assert!(outcome.rolled_back_tail());
531
532 // Tail preserved: the rollback file holds exactly the records
533 // above the common point.
534 let persisted = tx.persisted.as_ref().expect("tail persisted");
535 assert_eq!(persisted.common_point_lsn, 200);
536 assert_eq!(persisted.to_lsn, 230);
537 assert_eq!(persisted.records, tail(&[201, 210, 230], 7));
538 assert_eq!(
539 outcome.rollback_file.as_deref(),
540 Some("/data/rollback/lsn-200-230.rbk")
541 );
542
543 // Recover-to-LSN hit the common point.
544 assert_eq!(tx.recovered_to, Some(200));
545
546 // Loud operator event fired with the boundary + file.
547 assert!(outcome.event_fired);
548 let ev = tx.emitted.as_ref().expect("event emitted");
549 assert_eq!(ev.common_point_lsn, 200);
550 assert_eq!(ev.tail_to_lsn, 230);
551 assert_eq!(ev.tail_lsns, 30);
552 assert_eq!(ev.commit_watermark, 200);
553 assert_eq!(ev.rollback_file, "/data/rollback/lsn-200-230.rbk");
554 assert_eq!(ev.new_term, 8);
555
556 // Rejoined as a replica of the new primary under the new term.
557 assert_eq!(tx.rejoined, Some(("http://node-b:50051".to_string(), 8)));
558 assert_eq!(
559 outcome.role,
560 NodeRole::Replica {
561 primary_addr: "http://node-b:50051".to_string(),
562 term: 8,
563 }
564 );
565
566 // Critical ordering: tail is preserved BEFORE the timeline is
567 // recovered, and the event fires before rejoin.
568 assert_eq!(
569 tx.order,
570 vec!["read", "persist", "recover", "emit", "rejoin"]
571 );
572 }
573
574 // ------------------------------------------------------------------
575 // Full run: no divergent tail → clean rejoin, no rollback, no event
576 // ------------------------------------------------------------------
577
578 #[test]
579 fn run_with_no_tail_just_rejoins() {
580 // Frontier == common point: nothing diverged.
581 let mut tx = FakeTransport::new(vec![]);
582 let outcome =
583 RollbackCoordinator::run(&request(200, 200, 200), &mut tx).expect("clean rejoin");
584
585 assert_eq!(outcome.tail_lsns, 0);
586 assert!(!outcome.rolled_back_tail());
587 assert!(!outcome.event_fired, "no event when nothing is discarded");
588 assert_eq!(outcome.rollback_file, None);
589 assert!(tx.persisted.is_none(), "nothing persisted");
590 assert!(tx.recovered_to.is_none(), "nothing recovered");
591 assert!(tx.emitted.is_none(), "no operator event");
592 assert_eq!(tx.rejoined, Some(("http://node-b:50051".to_string(), 8)));
593 assert_eq!(tx.order, vec!["rejoin"]);
594 }
595
596 #[test]
597 fn run_with_frontier_below_common_point_is_a_clean_rejoin() {
598 // A node strictly behind the common point has no divergent tail;
599 // it just streams forward as a replica.
600 let mut tx = FakeTransport::new(vec![]);
601 let outcome =
602 RollbackCoordinator::run(&request(180, 200, 150), &mut tx).expect("clean rejoin");
603 assert_eq!(outcome.recovered_to_lsn, 200);
604 assert_eq!(outcome.tail_lsns, 0);
605 assert!(!outcome.event_fired);
606 assert_eq!(tx.order, vec!["rejoin"]);
607 }
608
609 // ------------------------------------------------------------------
610 // Full run: refusal & abort paths
611 // ------------------------------------------------------------------
612
613 #[test]
614 fn run_refuses_when_common_point_below_watermark_and_touches_nothing() {
615 let mut tx = FakeTransport::new(tail(&[160, 200, 300], 7));
616 let err = RollbackCoordinator::run(&request(300, 150, 200), &mut tx)
617 .expect_err("must refuse to cross the watermark");
618 assert!(matches!(err, RollbackError::WatermarkViolation { .. }));
619
620 // Nothing was touched: no read, no persist, no recover, no rejoin.
621 assert!(tx.persisted.is_none());
622 assert!(tx.recovered_to.is_none());
623 assert!(tx.emitted.is_none());
624 assert!(tx.rejoined.is_none());
625 assert!(tx.order.is_empty());
626 }
627
628 #[test]
629 fn run_aborts_without_recovering_when_tail_cannot_be_persisted() {
630 // Rollback is never silent: if the tail cannot be saved, the
631 // timeline is NOT rolled back.
632 let mut tx = FakeTransport::new(tail(&[210, 230], 7));
633 tx.persist_should_fail = true;
634 let err = RollbackCoordinator::run(&request(230, 200, 200), &mut tx)
635 .expect_err("must abort when persist fails");
636 assert!(matches!(err, RollbackError::TailPersistFailed { .. }));
637
638 // Read + attempted persist happened; recover/emit/rejoin did NOT.
639 assert!(tx.recovered_to.is_none(), "must not roll back the timeline");
640 assert!(tx.emitted.is_none());
641 assert!(tx.rejoined.is_none());
642 assert_eq!(tx.order, vec!["read", "persist"]);
643 }
644
645 #[test]
646 fn run_surfaces_recover_failure_after_preserving_the_tail() {
647 let mut tx = FakeTransport::new(tail(&[210, 230], 7));
648 tx.recover_should_fail = true;
649 let err = RollbackCoordinator::run(&request(230, 200, 200), &mut tx)
650 .expect_err("recover failure surfaces");
651 match err {
652 RollbackError::RecoverFailed { target_lsn, .. } => assert_eq!(target_lsn, 200),
653 other => panic!("expected RecoverFailed, got {other:?}"),
654 }
655
656 // The tail was preserved before the failed recover, so the writes
657 // are not lost; but the node did not rejoin on a half-rolled state.
658 assert!(tx.persisted.is_some(), "tail preserved before recover");
659 assert!(
660 tx.emitted.is_none(),
661 "no completion event on failed recover"
662 );
663 assert!(
664 tx.rejoined.is_none(),
665 "must not rejoin after a failed recover"
666 );
667 assert_eq!(tx.order, vec!["read", "persist", "recover"]);
668 }
669
670 #[test]
671 fn span_lsns_counts_the_removed_range() {
672 let t = DivergentTail {
673 common_point_lsn: 200,
674 to_lsn: 230,
675 records: tail(&[210, 230], 7),
676 };
677 assert_eq!(t.span_lsns(), 30);
678 }
679}