Skip to main content

reddb_server/replication/
cascade.rs

1//! Cascading replication for async read-replicas (issue #838, PRD #819).
2//!
3//! # Why cascade
4//!
5//! A single primary streaming WAL to every replica pays an O(replicas)
6//! fan-out cost: each connected replica is one more stream the primary must
7//! frame, retain WAL for, and track. Read scale-out — adding many async
8//! read-replicas — therefore loads the *primary*, the one node whose spare
9//! capacity matters most because it also serves the write path.
10//!
11//! Cascading replication bounds that fan-out: an async read-replica may
12//! stream from an **intermediate** replica instead of from the primary. The
13//! intermediate holds the sub-replica's slot and forwards the WAL stream it
14//! is already receiving. The primary sees one stream (to the intermediate)
15//! regardless of how many sub-replicas hang off it.
16//!
17//! # Why voting members never cascade
18//!
19//! ADR 0030 keeps the durability/election path simple and fast: a quorum is a
20//! majority of *voting* members, and a synchronous write is acknowledged only
21//! once a quorum has it durably. If a voting member streamed through an
22//! intermediate, every commit-ack and every election-relevant frontier would
23//! pay an extra hop of lag, and an intermediate failure would stall a member
24//! the consensus path depends on. So the rule is categorical: **a voting
25//! member always streams directly from the primary**. Cascade is a
26//! read-scale-out optimisation for members that are *not* in the durability
27//! path. A voting member that is handed a cascade source refuses it and falls
28//! back to the primary (see [`plan_upstream`]).
29//!
30//! # Frontier propagation
31//!
32//! Correctness of the chain rests on one invariant: **the primary must not
33//! prune WAL that any node downstream of the chain still needs.** The
34//! intermediate enforces this by reporting to its own upstream a *retention
35//! frontier* that is the minimum of (a) what it has itself applied and (b)
36//! what every sub-replica streaming through it has confirmed
37//! ([`CascadeRelay::upstream_confirmed_lsn`]). A slow leaf therefore holds the
38//! whole chain's slot open at the primary, exactly as if it were connected
39//! directly — this is the cascaded analogue of PostgreSQL's
40//! `hot_standby_feedback`.
41//!
42//! The read-visibility frontier flows the same direction: a causal
43//! ([`CausalBookmark`]) read can only be satisfied at a node that has applied
44//! up to the bookmark's `commit_lsn`. Down the chain the applied frontier is
45//! monotonically non-increasing (a sub-replica can never be ahead of the
46//! intermediate that feeds it), so
47//! [`CascadeRelay::downstream_visible_frontier`] reports the highest LSN a
48//! given sub-replica can serve.
49//!
50//! # Module shape
51//!
52//! This module is pure policy + bookkeeping with no I/O: [`plan_upstream`]
53//! decides where a node connects, and [`CascadeRelay`] tracks the slots and
54//! frontiers an intermediate holds for its sub-replicas. The transport that
55//! actually forwards bytes composes these primitives, so the rules are
56//! unit-testable without a network — the same discipline the election core
57//! (issue #834) follows.
58
59use std::collections::BTreeMap;
60
61use crate::replication::bookmark::CausalBookmark;
62use crate::replication::election::{Member, MemberKind, VotingState};
63
64// ---------------------------------------------------------------
65// Streaming class — who may cascade
66// ---------------------------------------------------------------
67
68/// How a node chooses its WAL upstream.
69///
70/// This is orthogonal to the election [`MemberKind`]/[`VotingState`] model: a
71/// node's *streaming class* answers "may this node accept a cascade source?",
72/// where the membership model answers "does this node vote / can it become
73/// primary?". A witness has no data stream at all, so it is irrelevant here;
74/// the meaningful split is between members on the durability path (which must
75/// stream directly) and pure read scale-out replicas (which may cascade).
76#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
77pub enum ReplicaClass {
78    /// Participates in the durability/election quorum. Streams **directly**
79    /// from the primary and refuses any cascade source (ADR 0030). This is
80    /// the safe default: a node only cascades when explicitly declared a
81    /// read-replica.
82    #[default]
83    Voting,
84    /// Async read-scale-out replica. Not in the durability path, so it **may**
85    /// stream from an intermediate replica to bound the primary's fan-out.
86    AsyncReadReplica,
87}
88
89impl ReplicaClass {
90    /// Derive the streaming class from an election membership view.
91    ///
92    /// Any member that currently counts toward quorum
93    /// ([`Member::is_voter`]) is on the durability path and must stream
94    /// directly; a data member that is non-voting (e.g. a read-replica that
95    /// never joins the voter set) may cascade. This lets a caller that
96    /// already holds a [`Member`] derive the cascade policy without
97    /// re-declaring intent.
98    pub fn from_member(member: &Member) -> Self {
99        // A witness carries no data stream; a voting member is on the
100        // durability path. Either way it must not cascade. Only a
101        // non-voting *data* member is a candidate for read scale-out.
102        match (member.kind, member.is_voter()) {
103            (MemberKind::Data, false) => ReplicaClass::AsyncReadReplica,
104            _ => ReplicaClass::Voting,
105        }
106    }
107
108    /// Whether a node of this class is permitted to stream from an
109    /// intermediate replica rather than the primary.
110    pub fn may_cascade(self) -> bool {
111        matches!(self, ReplicaClass::AsyncReadReplica)
112    }
113}
114
115/// An intermediate replica a sub-replica may cascade from.
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub struct CascadeUpstream {
118    /// Stable node identity of the intermediate (matches its replica id).
119    pub node_id: String,
120    /// Address the sub-replica connects to in order to stream from the
121    /// intermediate (e.g. `"http://replica-a:50051"`).
122    pub addr: String,
123}
124
125impl CascadeUpstream {
126    pub fn new(node_id: impl Into<String>, addr: impl Into<String>) -> Self {
127        Self {
128            node_id: node_id.into(),
129            addr: addr.into(),
130        }
131    }
132}
133
134/// Where a node should open its WAL stream.
135#[derive(Debug, Clone, PartialEq, Eq)]
136pub enum UpstreamChoice {
137    /// Connect directly to the primary.
138    Primary,
139    /// Cascade from the named intermediate replica.
140    Intermediate(CascadeUpstream),
141}
142
143impl UpstreamChoice {
144    /// `true` when this choice streams from an intermediate (a cascade).
145    pub fn is_cascade(&self) -> bool {
146        matches!(self, UpstreamChoice::Intermediate(_))
147    }
148}
149
150/// Why a requested cascade source was refused and the node fell back to the
151/// primary. Surfaced (not swallowed) so a misconfiguration is observable
152/// rather than a silent performance cliff.
153#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154pub enum CascadeRefusal {
155    /// The node is on the durability path; a voting member always streams
156    /// directly from the primary (ADR 0030).
157    VotingMemberDirectOnly,
158    /// The requested intermediate is the node itself — a node cannot cascade
159    /// from its own slot.
160    SelfReference,
161}
162
163impl CascadeRefusal {
164    pub fn as_str(self) -> &'static str {
165        match self {
166            Self::VotingMemberDirectOnly => "voting-member-direct-only",
167            Self::SelfReference => "self-reference",
168        }
169    }
170}
171
172/// Decide where a node streams from, given its streaming class and an
173/// optionally-requested intermediate source.
174///
175/// The decision is total and side-effect-free:
176///
177/// * No requested intermediate → [`UpstreamChoice::Primary`], no refusal.
178/// * Requested, but the node is a voting member → refuse with
179///   [`CascadeRefusal::VotingMemberDirectOnly`] and fall back to the primary.
180/// * Requested, but the intermediate is this node itself → refuse with
181///   [`CascadeRefusal::SelfReference`] and fall back to the primary.
182/// * Requested, node is an async read-replica, source is another node →
183///   [`UpstreamChoice::Intermediate`], no refusal.
184///
185/// Returning the refusal alongside the (safe) fallback choice lets the caller
186/// honour the connection immediately while still logging *why* a configured
187/// cascade did not take effect.
188pub fn plan_upstream(
189    self_node_id: &str,
190    class: ReplicaClass,
191    requested: Option<&CascadeUpstream>,
192) -> (UpstreamChoice, Option<CascadeRefusal>) {
193    let Some(upstream) = requested else {
194        return (UpstreamChoice::Primary, None);
195    };
196    if !class.may_cascade() {
197        return (
198            UpstreamChoice::Primary,
199            Some(CascadeRefusal::VotingMemberDirectOnly),
200        );
201    }
202    if upstream.node_id == self_node_id {
203        return (UpstreamChoice::Primary, Some(CascadeRefusal::SelfReference));
204    }
205    (UpstreamChoice::Intermediate(upstream.clone()), None)
206}
207
208// ---------------------------------------------------------------
209// CascadeRelay — an intermediate that holds slots and forwards
210// ---------------------------------------------------------------
211
212/// A sub-replica slot held by an intermediate.
213#[derive(Debug, Clone, PartialEq, Eq)]
214pub struct DownstreamSlot {
215    /// Identity of the sub-replica streaming through this intermediate.
216    pub id: String,
217    /// Highest LSN the sub-replica has confirmed durably applied. Drives the
218    /// retention frontier reported upstream — the intermediate must keep WAL
219    /// above this point so it can still forward it.
220    pub confirmed_lsn: u64,
221    /// Highest LSN forwarded to the sub-replica so far. Always
222    /// `>= confirmed_lsn`; the gap is in-flight, not yet acked.
223    pub sent_lsn: u64,
224}
225
226/// Tracks the sub-replica slots an intermediate holds and the frontiers that
227/// must propagate through the chain. Pure bookkeeping — the forwarding
228/// transport calls into it to decide what to send and what to advertise
229/// upstream.
230///
231/// All LSN updates are monotonic: a stale ack or a duplicate forward can never
232/// rewind a frontier, which keeps retention safe under reordering and retries.
233#[derive(Debug, Clone)]
234pub struct CascadeRelay {
235    node_id: String,
236    /// What this intermediate has itself applied from its own upstream. It can
237    /// never forward beyond this point — it cannot forward records it does not
238    /// yet hold.
239    self_applied_lsn: u64,
240    downstream: BTreeMap<String, DownstreamSlot>,
241}
242
243impl CascadeRelay {
244    pub fn new(node_id: impl Into<String>) -> Self {
245        Self {
246            node_id: node_id.into(),
247            self_applied_lsn: 0,
248            downstream: BTreeMap::new(),
249        }
250    }
251
252    pub fn node_id(&self) -> &str {
253        &self.node_id
254    }
255
256    /// Record how far this intermediate has applied from its own upstream.
257    /// Monotonic — a late report never rewinds the forward bound.
258    pub fn record_self_applied(&mut self, lsn: u64) {
259        self.self_applied_lsn = self.self_applied_lsn.max(lsn);
260    }
261
262    pub fn self_applied_lsn(&self) -> u64 {
263        self.self_applied_lsn
264    }
265
266    /// Hold a sub-replica's slot, resuming it at `start_lsn`.
267    ///
268    /// Idempotent on reconnect (issue #812 semantics): if the slot already
269    /// exists its progress is preserved — only a *forward* `start_lsn` can
270    /// advance `confirmed_lsn`, never rewind it — so a reconnecting
271    /// sub-replica is not pushed backwards. Returns the LSN the sub-replica
272    /// should resume streaming from (its retained confirmed position).
273    pub fn register_downstream(&mut self, id: impl Into<String>, start_lsn: u64) -> u64 {
274        let id = id.into();
275        let slot = self
276            .downstream
277            .entry(id.clone())
278            .or_insert_with(|| DownstreamSlot {
279                id,
280                confirmed_lsn: start_lsn,
281                sent_lsn: start_lsn,
282            });
283        // Only advance on (re)registration; never rewind a live slot.
284        slot.confirmed_lsn = slot.confirmed_lsn.max(start_lsn);
285        slot.sent_lsn = slot.sent_lsn.max(slot.confirmed_lsn);
286        slot.confirmed_lsn
287    }
288
289    /// Release a sub-replica's slot. Returns `true` if it was held. After
290    /// this, the released sub-replica no longer pins the chain's retention
291    /// frontier.
292    pub fn unregister_downstream(&mut self, id: &str) -> bool {
293        self.downstream.remove(id).is_some()
294    }
295
296    /// Record a sub-replica's confirmation that it has durably applied up to
297    /// `lsn`. Monotonic. No-op for an unknown id.
298    pub fn record_downstream_ack(&mut self, id: &str, lsn: u64) {
299        if let Some(slot) = self.downstream.get_mut(id) {
300            slot.confirmed_lsn = slot.confirmed_lsn.max(lsn);
301            slot.sent_lsn = slot.sent_lsn.max(slot.confirmed_lsn);
302        }
303    }
304
305    /// Note that records up to `lsn` were forwarded to a sub-replica.
306    /// Monotonic. No-op for an unknown id.
307    pub fn note_forwarded(&mut self, id: &str, lsn: u64) {
308        if let Some(slot) = self.downstream.get_mut(id) {
309            slot.sent_lsn = slot.sent_lsn.max(lsn);
310        }
311    }
312
313    pub fn downstream_ids(&self) -> Vec<String> {
314        self.downstream.keys().cloned().collect()
315    }
316
317    pub fn downstream_slot(&self, id: &str) -> Option<&DownstreamSlot> {
318        self.downstream.get(id)
319    }
320
321    pub fn downstream_count(&self) -> usize {
322        self.downstream.len()
323    }
324
325    /// The retention frontier this intermediate reports to its own upstream
326    /// (the primary, or a further intermediate).
327    ///
328    /// This is the crux of chain correctness: it is the minimum of what the
329    /// intermediate has itself applied and what *every* sub-replica has
330    /// confirmed. The upstream retains WAL at or above this point, so a slow
331    /// leaf keeps the whole chain's slot open — the primary never prunes a
332    /// record some downstream node still needs.
333    ///
334    /// With no sub-replicas the frontier is simply the intermediate's own
335    /// applied position (it behaves like an ordinary direct replica).
336    pub fn upstream_confirmed_lsn(&self) -> u64 {
337        match self
338            .downstream
339            .values()
340            .map(|slot| slot.confirmed_lsn)
341            .min()
342        {
343            // Clamp the slowest leaf by what we actually hold: a leaf can
344            // never need WAL beyond what the intermediate has applied.
345            Some(min_downstream) => min_downstream.min(self.self_applied_lsn),
346            None => self.self_applied_lsn,
347        }
348    }
349
350    /// The retention frontier as a causal bookmark, stamped with `term`.
351    /// Lets the chain advertise its safe-to-prune point in the same token
352    /// vocabulary causal reads use (ADR 0031).
353    pub fn upstream_confirmed_bookmark(&self, term: u64) -> CausalBookmark {
354        CausalBookmark::new(term, self.upstream_confirmed_lsn())
355    }
356
357    /// The highest LSN a given sub-replica can currently serve for a causal
358    /// read — the minimum of the intermediate's applied frontier and the
359    /// sub-replica's own confirmed position. Down the chain this is
360    /// monotonically non-increasing, so a bookmark read routes to a node only
361    /// if that node's visible frontier covers the bookmark's `commit_lsn`.
362    ///
363    /// Returns `None` for an unknown sub-replica.
364    pub fn downstream_visible_frontier(&self, id: &str) -> Option<u64> {
365        self.downstream
366            .get(id)
367            .map(|slot| slot.confirmed_lsn.min(self.self_applied_lsn))
368    }
369
370    /// Whether a sub-replica can satisfy a read at `bookmark`. True only when
371    /// its visible frontier covers the bookmark's commit LSN.
372    pub fn downstream_can_serve(&self, id: &str, bookmark: &CausalBookmark) -> bool {
373        self.downstream_visible_frontier(id)
374            .is_some_and(|frontier| frontier >= bookmark.commit_lsn())
375    }
376
377    /// Select the records to forward to a sub-replica from a batch the
378    /// intermediate has on hand.
379    ///
380    /// `available` is a slice of `(lsn, payload)` the intermediate has
381    /// received from its own upstream, assumed ascending by LSN. The result
382    /// keeps every record with `requested_since_lsn < lsn <= self_applied_lsn`
383    /// — newer than what the sub-replica has, and not beyond what the
384    /// intermediate itself holds. Records the intermediate has buffered but
385    /// not yet applied are withheld, so a sub-replica never sees data ahead of
386    /// its feeder.
387    pub fn records_to_forward<'a, T>(
388        &self,
389        requested_since_lsn: u64,
390        available: &'a [(u64, T)],
391    ) -> Vec<&'a (u64, T)> {
392        let ceiling = self.self_applied_lsn;
393        available
394            .iter()
395            .filter(|(lsn, _)| *lsn > requested_since_lsn && *lsn <= ceiling)
396            .collect()
397    }
398}
399
400// ---------------------------------------------------------------
401// Tests
402// ---------------------------------------------------------------
403
404#[cfg(test)]
405mod tests {
406    use super::*;
407    use crate::replication::election::Member;
408
409    // -- plan_upstream -------------------------------------------------
410
411    #[test]
412    fn no_requested_source_streams_from_primary() {
413        let (choice, refusal) = plan_upstream("r1", ReplicaClass::AsyncReadReplica, None);
414        assert_eq!(choice, UpstreamChoice::Primary);
415        assert!(refusal.is_none());
416    }
417
418    #[test]
419    fn async_read_replica_cascades_from_intermediate() {
420        let up = CascadeUpstream::new("inter", "http://inter:50051");
421        let (choice, refusal) = plan_upstream("leaf", ReplicaClass::AsyncReadReplica, Some(&up));
422        assert!(choice.is_cascade());
423        assert_eq!(choice, UpstreamChoice::Intermediate(up));
424        assert!(refusal.is_none());
425    }
426
427    #[test]
428    fn voting_member_refuses_cascade_and_falls_back_to_primary() {
429        let up = CascadeUpstream::new("inter", "http://inter:50051");
430        let (choice, refusal) = plan_upstream("voter", ReplicaClass::Voting, Some(&up));
431        assert_eq!(choice, UpstreamChoice::Primary);
432        assert_eq!(refusal, Some(CascadeRefusal::VotingMemberDirectOnly));
433    }
434
435    #[test]
436    fn node_refuses_to_cascade_from_itself() {
437        let up = CascadeUpstream::new("self", "http://self:50051");
438        let (choice, refusal) = plan_upstream("self", ReplicaClass::AsyncReadReplica, Some(&up));
439        assert_eq!(choice, UpstreamChoice::Primary);
440        assert_eq!(refusal, Some(CascadeRefusal::SelfReference));
441    }
442
443    #[test]
444    fn class_from_member_keeps_voters_direct() {
445        assert_eq!(
446            ReplicaClass::from_member(&Member::data_voting("v")),
447            ReplicaClass::Voting
448        );
449        assert_eq!(
450            ReplicaClass::from_member(&Member::witness("w")),
451            ReplicaClass::Voting
452        );
453        // A non-voting data member is a read-replica candidate.
454        assert_eq!(
455            ReplicaClass::from_member(&Member::data_catching_up("c")),
456            ReplicaClass::AsyncReadReplica
457        );
458    }
459
460    // -- CascadeRelay --------------------------------------------------
461
462    #[test]
463    fn relay_with_no_downstream_reports_own_applied_frontier() {
464        let mut relay = CascadeRelay::new("inter");
465        relay.record_self_applied(42);
466        assert_eq!(relay.upstream_confirmed_lsn(), 42);
467    }
468
469    #[test]
470    fn register_downstream_holds_slot_and_is_idempotent() {
471        let mut relay = CascadeRelay::new("inter");
472        assert_eq!(relay.register_downstream("leaf", 10), 10);
473        relay.record_downstream_ack("leaf", 25);
474        // Reconnect at an older start must not rewind the live slot.
475        assert_eq!(relay.register_downstream("leaf", 5), 25);
476        assert_eq!(relay.downstream_count(), 1);
477    }
478
479    #[test]
480    fn slow_leaf_pins_chain_retention_frontier() {
481        let mut relay = CascadeRelay::new("inter");
482        relay.record_self_applied(100);
483        relay.register_downstream("fast", 0);
484        relay.register_downstream("slow", 0);
485        relay.record_downstream_ack("fast", 90);
486        relay.record_downstream_ack("slow", 40);
487        // The intermediate must keep WAL the slow leaf still needs: the
488        // frontier it reports upstream is the slowest leaf, not its own
489        // applied position.
490        assert_eq!(relay.upstream_confirmed_lsn(), 40);
491
492        // Slow leaf catches up → frontier advances to min(self, fast).
493        relay.record_downstream_ack("slow", 95);
494        assert_eq!(relay.upstream_confirmed_lsn(), 90);
495
496        // Both pass self_applied is impossible (can't confirm un-forwarded
497        // data); clamp holds at self_applied.
498        relay.record_downstream_ack("fast", 100);
499        relay.record_downstream_ack("slow", 100);
500        assert_eq!(relay.upstream_confirmed_lsn(), 100);
501    }
502
503    #[test]
504    fn releasing_slow_leaf_unblocks_frontier() {
505        let mut relay = CascadeRelay::new("inter");
506        relay.record_self_applied(100);
507        relay.register_downstream("slow", 0);
508        relay.record_downstream_ack("slow", 10);
509        assert_eq!(relay.upstream_confirmed_lsn(), 10);
510        assert!(relay.unregister_downstream("slow"));
511        assert_eq!(relay.upstream_confirmed_lsn(), 100);
512        assert!(!relay.unregister_downstream("slow"));
513    }
514
515    #[test]
516    fn acks_and_forwards_are_monotonic() {
517        let mut relay = CascadeRelay::new("inter");
518        relay.record_self_applied(50);
519        relay.register_downstream("leaf", 0);
520        relay.record_downstream_ack("leaf", 30);
521        relay.record_downstream_ack("leaf", 20); // stale, ignored
522        relay.note_forwarded("leaf", 45);
523        relay.note_forwarded("leaf", 10); // stale, ignored
524        let slot = relay.downstream_slot("leaf").unwrap();
525        assert_eq!(slot.confirmed_lsn, 30);
526        assert_eq!(slot.sent_lsn, 45);
527        relay.record_self_applied(20); // stale, ignored
528        assert_eq!(relay.self_applied_lsn(), 50);
529    }
530
531    #[test]
532    fn records_to_forward_bounds_by_since_and_self_applied() {
533        let mut relay = CascadeRelay::new("inter");
534        relay.record_self_applied(4);
535        let available: Vec<(u64, &str)> =
536            vec![(1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e"), (6, "f")];
537        // since=2 → forward 3,4 (5,6 withheld: not yet applied here).
538        let picked = relay.records_to_forward(2, &available);
539        let lsns: Vec<u64> = picked.iter().map(|(lsn, _)| *lsn).collect();
540        assert_eq!(lsns, vec![3, 4]);
541
542        // After applying more, the ceiling rises.
543        relay.record_self_applied(6);
544        let picked = relay.records_to_forward(2, &available);
545        let lsns: Vec<u64> = picked.iter().map(|(lsn, _)| *lsn).collect();
546        assert_eq!(lsns, vec![3, 4, 5, 6]);
547    }
548
549    #[test]
550    fn visible_frontier_is_monotonically_non_increasing_down_chain() {
551        let mut relay = CascadeRelay::new("inter");
552        relay.record_self_applied(80);
553        relay.register_downstream("leaf", 0);
554        relay.record_downstream_ack("leaf", 60);
555        // The leaf can serve up to its own confirmed, never beyond the feeder.
556        assert_eq!(relay.downstream_visible_frontier("leaf"), Some(60));
557
558        // If the leaf somehow confirms beyond the feeder (shouldn't happen),
559        // the visible frontier still clamps at self_applied.
560        relay.record_downstream_ack("leaf", 200);
561        assert_eq!(relay.downstream_visible_frontier("leaf"), Some(80));
562        assert_eq!(relay.downstream_visible_frontier("unknown"), None);
563    }
564
565    #[test]
566    fn downstream_can_serve_bookmark_only_when_frontier_covers_it() {
567        let mut relay = CascadeRelay::new("inter");
568        relay.record_self_applied(100);
569        relay.register_downstream("leaf", 0);
570        relay.record_downstream_ack("leaf", 50);
571        let within = CausalBookmark::new(1, 50);
572        let beyond = CausalBookmark::new(1, 51);
573        assert!(relay.downstream_can_serve("leaf", &within));
574        assert!(!relay.downstream_can_serve("leaf", &beyond));
575        assert!(!relay.downstream_can_serve("missing", &within));
576    }
577
578    #[test]
579    fn upstream_confirmed_bookmark_stamps_term() {
580        let mut relay = CascadeRelay::new("inter");
581        relay.record_self_applied(100);
582        relay.register_downstream("leaf", 0);
583        relay.record_downstream_ack("leaf", 70);
584        let bm = relay.upstream_confirmed_bookmark(7);
585        assert_eq!(bm.term(), 7);
586        assert_eq!(bm.commit_lsn(), 70);
587    }
588}