reddb_server/replication/cascade.rs
1//! Cascading replication for async read-replicas (issue #838, PRD #819).
2//!
3//! # Why cascade
4//!
5//! A single primary streaming WAL to every replica pays an O(replicas)
6//! fan-out cost: each connected replica is one more stream the primary must
7//! frame, retain WAL for, and track. Read scale-out — adding many async
8//! read-replicas — therefore loads the *primary*, the one node whose spare
9//! capacity matters most because it also serves the write path.
10//!
11//! Cascading replication bounds that fan-out: an async read-replica may
12//! stream from an **intermediate** replica instead of from the primary. The
13//! intermediate holds the sub-replica's slot and forwards the WAL stream it
14//! is already receiving. The primary sees one stream (to the intermediate)
15//! regardless of how many sub-replicas hang off it.
16//!
17//! # Why voting members never cascade
18//!
19//! ADR 0030 keeps the durability/election path simple and fast: a quorum is a
20//! majority of *voting* members, and a synchronous write is acknowledged only
21//! once a quorum has it durably. If a voting member streamed through an
22//! intermediate, every commit-ack and every election-relevant frontier would
23//! pay an extra hop of lag, and an intermediate failure would stall a member
24//! the consensus path depends on. So the rule is categorical: **a voting
25//! member always streams directly from the primary**. Cascade is a
26//! read-scale-out optimisation for members that are *not* in the durability
27//! path. A voting member that is handed a cascade source refuses it and falls
28//! back to the primary (see [`plan_upstream`]).
29//!
30//! # Frontier propagation
31//!
32//! Correctness of the chain rests on one invariant: **the primary must not
33//! prune WAL that any node downstream of the chain still needs.** The
34//! intermediate enforces this by reporting to its own upstream a *retention
35//! frontier* that is the minimum of (a) what it has itself applied and (b)
36//! what every sub-replica streaming through it has confirmed
37//! ([`CascadeRelay::upstream_confirmed_lsn`]). A slow leaf therefore holds the
38//! whole chain's slot open at the primary, exactly as if it were connected
39//! directly — this is the cascaded analogue of PostgreSQL's
40//! `hot_standby_feedback`.
41//!
42//! The read-visibility frontier flows the same direction: a causal
43//! ([`CausalBookmark`]) read can only be satisfied at a node that has applied
44//! up to the bookmark's `commit_lsn`. Down the chain the applied frontier is
45//! monotonically non-increasing (a sub-replica can never be ahead of the
46//! intermediate that feeds it), so
47//! [`CascadeRelay::downstream_visible_frontier`] reports the highest LSN a
48//! given sub-replica can serve.
49//!
50//! # Module shape
51//!
52//! This module is pure policy + bookkeeping with no I/O: [`plan_upstream`]
53//! decides where a node connects, and [`CascadeRelay`] tracks the slots and
54//! frontiers an intermediate holds for its sub-replicas. The transport that
55//! actually forwards bytes composes these primitives, so the rules are
56//! unit-testable without a network — the same discipline the election core
57//! (issue #834) follows.
58
59use std::collections::BTreeMap;
60
61use crate::replication::bookmark::CausalBookmark;
62use crate::replication::election::{Member, MemberKind, VotingState};
63
64// ---------------------------------------------------------------
65// Streaming class — who may cascade
66// ---------------------------------------------------------------
67
68/// How a node chooses its WAL upstream.
69///
70/// This is orthogonal to the election [`MemberKind`]/[`VotingState`] model: a
71/// node's *streaming class* answers "may this node accept a cascade source?",
72/// where the membership model answers "does this node vote / can it become
73/// primary?". A witness has no data stream at all, so it is irrelevant here;
74/// the meaningful split is between members on the durability path (which must
75/// stream directly) and pure read scale-out replicas (which may cascade).
76#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
77pub enum ReplicaClass {
78 /// Participates in the durability/election quorum. Streams **directly**
79 /// from the primary and refuses any cascade source (ADR 0030). This is
80 /// the safe default: a node only cascades when explicitly declared a
81 /// read-replica.
82 #[default]
83 Voting,
84 /// Async read-scale-out replica. Not in the durability path, so it **may**
85 /// stream from an intermediate replica to bound the primary's fan-out.
86 AsyncReadReplica,
87}
88
89impl ReplicaClass {
90 /// Derive the streaming class from an election membership view.
91 ///
92 /// Any member that currently counts toward quorum
93 /// ([`Member::is_voter`]) is on the durability path and must stream
94 /// directly; a data member that is non-voting (e.g. a read-replica that
95 /// never joins the voter set) may cascade. This lets a caller that
96 /// already holds a [`Member`] derive the cascade policy without
97 /// re-declaring intent.
98 pub fn from_member(member: &Member) -> Self {
99 // A witness carries no data stream; a voting member is on the
100 // durability path. Either way it must not cascade. Only a
101 // non-voting *data* member is a candidate for read scale-out.
102 match (member.kind, member.is_voter()) {
103 (MemberKind::Data, false) => ReplicaClass::AsyncReadReplica,
104 _ => ReplicaClass::Voting,
105 }
106 }
107
108 /// Whether a node of this class is permitted to stream from an
109 /// intermediate replica rather than the primary.
110 pub fn may_cascade(self) -> bool {
111 matches!(self, ReplicaClass::AsyncReadReplica)
112 }
113}
114
115/// An intermediate replica a sub-replica may cascade from.
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub struct CascadeUpstream {
118 /// Stable node identity of the intermediate (matches its replica id).
119 pub node_id: String,
120 /// Address the sub-replica connects to in order to stream from the
121 /// intermediate (e.g. `"http://replica-a:50051"`).
122 pub addr: String,
123}
124
125impl CascadeUpstream {
126 pub fn new(node_id: impl Into<String>, addr: impl Into<String>) -> Self {
127 Self {
128 node_id: node_id.into(),
129 addr: addr.into(),
130 }
131 }
132}
133
134/// Where a node should open its WAL stream.
135#[derive(Debug, Clone, PartialEq, Eq)]
136pub enum UpstreamChoice {
137 /// Connect directly to the primary.
138 Primary,
139 /// Cascade from the named intermediate replica.
140 Intermediate(CascadeUpstream),
141}
142
143impl UpstreamChoice {
144 /// `true` when this choice streams from an intermediate (a cascade).
145 pub fn is_cascade(&self) -> bool {
146 matches!(self, UpstreamChoice::Intermediate(_))
147 }
148}
149
150/// Why a requested cascade source was refused and the node fell back to the
151/// primary. Surfaced (not swallowed) so a misconfiguration is observable
152/// rather than a silent performance cliff.
153#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154pub enum CascadeRefusal {
155 /// The node is on the durability path; a voting member always streams
156 /// directly from the primary (ADR 0030).
157 VotingMemberDirectOnly,
158 /// The requested intermediate is the node itself — a node cannot cascade
159 /// from its own slot.
160 SelfReference,
161}
162
163impl CascadeRefusal {
164 pub fn as_str(self) -> &'static str {
165 match self {
166 Self::VotingMemberDirectOnly => "voting-member-direct-only",
167 Self::SelfReference => "self-reference",
168 }
169 }
170}
171
172/// Decide where a node streams from, given its streaming class and an
173/// optionally-requested intermediate source.
174///
175/// The decision is total and side-effect-free:
176///
177/// * No requested intermediate → [`UpstreamChoice::Primary`], no refusal.
178/// * Requested, but the node is a voting member → refuse with
179/// [`CascadeRefusal::VotingMemberDirectOnly`] and fall back to the primary.
180/// * Requested, but the intermediate is this node itself → refuse with
181/// [`CascadeRefusal::SelfReference`] and fall back to the primary.
182/// * Requested, node is an async read-replica, source is another node →
183/// [`UpstreamChoice::Intermediate`], no refusal.
184///
185/// Returning the refusal alongside the (safe) fallback choice lets the caller
186/// honour the connection immediately while still logging *why* a configured
187/// cascade did not take effect.
188pub fn plan_upstream(
189 self_node_id: &str,
190 class: ReplicaClass,
191 requested: Option<&CascadeUpstream>,
192) -> (UpstreamChoice, Option<CascadeRefusal>) {
193 let Some(upstream) = requested else {
194 return (UpstreamChoice::Primary, None);
195 };
196 if !class.may_cascade() {
197 return (
198 UpstreamChoice::Primary,
199 Some(CascadeRefusal::VotingMemberDirectOnly),
200 );
201 }
202 if upstream.node_id == self_node_id {
203 return (UpstreamChoice::Primary, Some(CascadeRefusal::SelfReference));
204 }
205 (UpstreamChoice::Intermediate(upstream.clone()), None)
206}
207
208// ---------------------------------------------------------------
209// CascadeRelay — an intermediate that holds slots and forwards
210// ---------------------------------------------------------------
211
212/// A sub-replica slot held by an intermediate.
213#[derive(Debug, Clone, PartialEq, Eq)]
214pub struct DownstreamSlot {
215 /// Identity of the sub-replica streaming through this intermediate.
216 pub id: String,
217 /// Highest LSN the sub-replica has confirmed durably applied. Drives the
218 /// retention frontier reported upstream — the intermediate must keep WAL
219 /// above this point so it can still forward it.
220 pub confirmed_lsn: u64,
221 /// Highest LSN forwarded to the sub-replica so far. Always
222 /// `>= confirmed_lsn`; the gap is in-flight, not yet acked.
223 pub sent_lsn: u64,
224}
225
226/// Tracks the sub-replica slots an intermediate holds and the frontiers that
227/// must propagate through the chain. Pure bookkeeping — the forwarding
228/// transport calls into it to decide what to send and what to advertise
229/// upstream.
230///
231/// All LSN updates are monotonic: a stale ack or a duplicate forward can never
232/// rewind a frontier, which keeps retention safe under reordering and retries.
233#[derive(Debug, Clone)]
234pub struct CascadeRelay {
235 node_id: String,
236 /// What this intermediate has itself applied from its own upstream. It can
237 /// never forward beyond this point — it cannot forward records it does not
238 /// yet hold.
239 self_applied_lsn: u64,
240 downstream: BTreeMap<String, DownstreamSlot>,
241}
242
243impl CascadeRelay {
244 pub fn new(node_id: impl Into<String>) -> Self {
245 Self {
246 node_id: node_id.into(),
247 self_applied_lsn: 0,
248 downstream: BTreeMap::new(),
249 }
250 }
251
252 pub fn node_id(&self) -> &str {
253 &self.node_id
254 }
255
256 /// Record how far this intermediate has applied from its own upstream.
257 /// Monotonic — a late report never rewinds the forward bound.
258 pub fn record_self_applied(&mut self, lsn: u64) {
259 self.self_applied_lsn = self.self_applied_lsn.max(lsn);
260 }
261
262 pub fn self_applied_lsn(&self) -> u64 {
263 self.self_applied_lsn
264 }
265
266 /// Hold a sub-replica's slot, resuming it at `start_lsn`.
267 ///
268 /// Idempotent on reconnect (issue #812 semantics): if the slot already
269 /// exists its progress is preserved — only a *forward* `start_lsn` can
270 /// advance `confirmed_lsn`, never rewind it — so a reconnecting
271 /// sub-replica is not pushed backwards. Returns the LSN the sub-replica
272 /// should resume streaming from (its retained confirmed position).
273 pub fn register_downstream(&mut self, id: impl Into<String>, start_lsn: u64) -> u64 {
274 let id = id.into();
275 let slot = self
276 .downstream
277 .entry(id.clone())
278 .or_insert_with(|| DownstreamSlot {
279 id,
280 confirmed_lsn: start_lsn,
281 sent_lsn: start_lsn,
282 });
283 // Only advance on (re)registration; never rewind a live slot.
284 slot.confirmed_lsn = slot.confirmed_lsn.max(start_lsn);
285 slot.sent_lsn = slot.sent_lsn.max(slot.confirmed_lsn);
286 slot.confirmed_lsn
287 }
288
289 /// Release a sub-replica's slot. Returns `true` if it was held. After
290 /// this, the released sub-replica no longer pins the chain's retention
291 /// frontier.
292 pub fn unregister_downstream(&mut self, id: &str) -> bool {
293 self.downstream.remove(id).is_some()
294 }
295
296 /// Record a sub-replica's confirmation that it has durably applied up to
297 /// `lsn`. Monotonic. No-op for an unknown id.
298 pub fn record_downstream_ack(&mut self, id: &str, lsn: u64) {
299 if let Some(slot) = self.downstream.get_mut(id) {
300 slot.confirmed_lsn = slot.confirmed_lsn.max(lsn);
301 slot.sent_lsn = slot.sent_lsn.max(slot.confirmed_lsn);
302 }
303 }
304
305 /// Note that records up to `lsn` were forwarded to a sub-replica.
306 /// Monotonic. No-op for an unknown id.
307 pub fn note_forwarded(&mut self, id: &str, lsn: u64) {
308 if let Some(slot) = self.downstream.get_mut(id) {
309 slot.sent_lsn = slot.sent_lsn.max(lsn);
310 }
311 }
312
313 pub fn downstream_ids(&self) -> Vec<String> {
314 self.downstream.keys().cloned().collect()
315 }
316
317 pub fn downstream_slot(&self, id: &str) -> Option<&DownstreamSlot> {
318 self.downstream.get(id)
319 }
320
321 pub fn downstream_count(&self) -> usize {
322 self.downstream.len()
323 }
324
325 /// The retention frontier this intermediate reports to its own upstream
326 /// (the primary, or a further intermediate).
327 ///
328 /// This is the crux of chain correctness: it is the minimum of what the
329 /// intermediate has itself applied and what *every* sub-replica has
330 /// confirmed. The upstream retains WAL at or above this point, so a slow
331 /// leaf keeps the whole chain's slot open — the primary never prunes a
332 /// record some downstream node still needs.
333 ///
334 /// With no sub-replicas the frontier is simply the intermediate's own
335 /// applied position (it behaves like an ordinary direct replica).
336 pub fn upstream_confirmed_lsn(&self) -> u64 {
337 match self
338 .downstream
339 .values()
340 .map(|slot| slot.confirmed_lsn)
341 .min()
342 {
343 // Clamp the slowest leaf by what we actually hold: a leaf can
344 // never need WAL beyond what the intermediate has applied.
345 Some(min_downstream) => min_downstream.min(self.self_applied_lsn),
346 None => self.self_applied_lsn,
347 }
348 }
349
350 /// The retention frontier as a causal bookmark, stamped with `term`.
351 /// Lets the chain advertise its safe-to-prune point in the same token
352 /// vocabulary causal reads use (ADR 0031).
353 pub fn upstream_confirmed_bookmark(&self, term: u64) -> CausalBookmark {
354 CausalBookmark::new(term, self.upstream_confirmed_lsn())
355 }
356
357 /// The highest LSN a given sub-replica can currently serve for a causal
358 /// read — the minimum of the intermediate's applied frontier and the
359 /// sub-replica's own confirmed position. Down the chain this is
360 /// monotonically non-increasing, so a bookmark read routes to a node only
361 /// if that node's visible frontier covers the bookmark's `commit_lsn`.
362 ///
363 /// Returns `None` for an unknown sub-replica.
364 pub fn downstream_visible_frontier(&self, id: &str) -> Option<u64> {
365 self.downstream
366 .get(id)
367 .map(|slot| slot.confirmed_lsn.min(self.self_applied_lsn))
368 }
369
370 /// Whether a sub-replica can satisfy a read at `bookmark`. True only when
371 /// its visible frontier covers the bookmark's commit LSN.
372 pub fn downstream_can_serve(&self, id: &str, bookmark: &CausalBookmark) -> bool {
373 self.downstream_visible_frontier(id)
374 .is_some_and(|frontier| frontier >= bookmark.commit_lsn())
375 }
376
377 /// Select the records to forward to a sub-replica from a batch the
378 /// intermediate has on hand.
379 ///
380 /// `available` is a slice of `(lsn, payload)` the intermediate has
381 /// received from its own upstream, assumed ascending by LSN. The result
382 /// keeps every record with `requested_since_lsn < lsn <= self_applied_lsn`
383 /// — newer than what the sub-replica has, and not beyond what the
384 /// intermediate itself holds. Records the intermediate has buffered but
385 /// not yet applied are withheld, so a sub-replica never sees data ahead of
386 /// its feeder.
387 pub fn records_to_forward<'a, T>(
388 &self,
389 requested_since_lsn: u64,
390 available: &'a [(u64, T)],
391 ) -> Vec<&'a (u64, T)> {
392 let ceiling = self.self_applied_lsn;
393 available
394 .iter()
395 .filter(|(lsn, _)| *lsn > requested_since_lsn && *lsn <= ceiling)
396 .collect()
397 }
398}
399
400// ---------------------------------------------------------------
401// Tests
402// ---------------------------------------------------------------
403
404#[cfg(test)]
405mod tests {
406 use super::*;
407 use crate::replication::election::Member;
408
409 // -- plan_upstream -------------------------------------------------
410
411 #[test]
412 fn no_requested_source_streams_from_primary() {
413 let (choice, refusal) = plan_upstream("r1", ReplicaClass::AsyncReadReplica, None);
414 assert_eq!(choice, UpstreamChoice::Primary);
415 assert!(refusal.is_none());
416 }
417
418 #[test]
419 fn async_read_replica_cascades_from_intermediate() {
420 let up = CascadeUpstream::new("inter", "http://inter:50051");
421 let (choice, refusal) = plan_upstream("leaf", ReplicaClass::AsyncReadReplica, Some(&up));
422 assert!(choice.is_cascade());
423 assert_eq!(choice, UpstreamChoice::Intermediate(up));
424 assert!(refusal.is_none());
425 }
426
427 #[test]
428 fn voting_member_refuses_cascade_and_falls_back_to_primary() {
429 let up = CascadeUpstream::new("inter", "http://inter:50051");
430 let (choice, refusal) = plan_upstream("voter", ReplicaClass::Voting, Some(&up));
431 assert_eq!(choice, UpstreamChoice::Primary);
432 assert_eq!(refusal, Some(CascadeRefusal::VotingMemberDirectOnly));
433 }
434
435 #[test]
436 fn node_refuses_to_cascade_from_itself() {
437 let up = CascadeUpstream::new("self", "http://self:50051");
438 let (choice, refusal) = plan_upstream("self", ReplicaClass::AsyncReadReplica, Some(&up));
439 assert_eq!(choice, UpstreamChoice::Primary);
440 assert_eq!(refusal, Some(CascadeRefusal::SelfReference));
441 }
442
443 #[test]
444 fn class_from_member_keeps_voters_direct() {
445 assert_eq!(
446 ReplicaClass::from_member(&Member::data_voting("v")),
447 ReplicaClass::Voting
448 );
449 assert_eq!(
450 ReplicaClass::from_member(&Member::witness("w")),
451 ReplicaClass::Voting
452 );
453 // A non-voting data member is a read-replica candidate.
454 assert_eq!(
455 ReplicaClass::from_member(&Member::data_catching_up("c")),
456 ReplicaClass::AsyncReadReplica
457 );
458 }
459
460 // -- CascadeRelay --------------------------------------------------
461
462 #[test]
463 fn relay_with_no_downstream_reports_own_applied_frontier() {
464 let mut relay = CascadeRelay::new("inter");
465 relay.record_self_applied(42);
466 assert_eq!(relay.upstream_confirmed_lsn(), 42);
467 }
468
469 #[test]
470 fn register_downstream_holds_slot_and_is_idempotent() {
471 let mut relay = CascadeRelay::new("inter");
472 assert_eq!(relay.register_downstream("leaf", 10), 10);
473 relay.record_downstream_ack("leaf", 25);
474 // Reconnect at an older start must not rewind the live slot.
475 assert_eq!(relay.register_downstream("leaf", 5), 25);
476 assert_eq!(relay.downstream_count(), 1);
477 }
478
479 #[test]
480 fn slow_leaf_pins_chain_retention_frontier() {
481 let mut relay = CascadeRelay::new("inter");
482 relay.record_self_applied(100);
483 relay.register_downstream("fast", 0);
484 relay.register_downstream("slow", 0);
485 relay.record_downstream_ack("fast", 90);
486 relay.record_downstream_ack("slow", 40);
487 // The intermediate must keep WAL the slow leaf still needs: the
488 // frontier it reports upstream is the slowest leaf, not its own
489 // applied position.
490 assert_eq!(relay.upstream_confirmed_lsn(), 40);
491
492 // Slow leaf catches up → frontier advances to min(self, fast).
493 relay.record_downstream_ack("slow", 95);
494 assert_eq!(relay.upstream_confirmed_lsn(), 90);
495
496 // Both pass self_applied is impossible (can't confirm un-forwarded
497 // data); clamp holds at self_applied.
498 relay.record_downstream_ack("fast", 100);
499 relay.record_downstream_ack("slow", 100);
500 assert_eq!(relay.upstream_confirmed_lsn(), 100);
501 }
502
503 #[test]
504 fn releasing_slow_leaf_unblocks_frontier() {
505 let mut relay = CascadeRelay::new("inter");
506 relay.record_self_applied(100);
507 relay.register_downstream("slow", 0);
508 relay.record_downstream_ack("slow", 10);
509 assert_eq!(relay.upstream_confirmed_lsn(), 10);
510 assert!(relay.unregister_downstream("slow"));
511 assert_eq!(relay.upstream_confirmed_lsn(), 100);
512 assert!(!relay.unregister_downstream("slow"));
513 }
514
515 #[test]
516 fn acks_and_forwards_are_monotonic() {
517 let mut relay = CascadeRelay::new("inter");
518 relay.record_self_applied(50);
519 relay.register_downstream("leaf", 0);
520 relay.record_downstream_ack("leaf", 30);
521 relay.record_downstream_ack("leaf", 20); // stale, ignored
522 relay.note_forwarded("leaf", 45);
523 relay.note_forwarded("leaf", 10); // stale, ignored
524 let slot = relay.downstream_slot("leaf").unwrap();
525 assert_eq!(slot.confirmed_lsn, 30);
526 assert_eq!(slot.sent_lsn, 45);
527 relay.record_self_applied(20); // stale, ignored
528 assert_eq!(relay.self_applied_lsn(), 50);
529 }
530
531 #[test]
532 fn records_to_forward_bounds_by_since_and_self_applied() {
533 let mut relay = CascadeRelay::new("inter");
534 relay.record_self_applied(4);
535 let available: Vec<(u64, &str)> =
536 vec![(1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e"), (6, "f")];
537 // since=2 → forward 3,4 (5,6 withheld: not yet applied here).
538 let picked = relay.records_to_forward(2, &available);
539 let lsns: Vec<u64> = picked.iter().map(|(lsn, _)| *lsn).collect();
540 assert_eq!(lsns, vec![3, 4]);
541
542 // After applying more, the ceiling rises.
543 relay.record_self_applied(6);
544 let picked = relay.records_to_forward(2, &available);
545 let lsns: Vec<u64> = picked.iter().map(|(lsn, _)| *lsn).collect();
546 assert_eq!(lsns, vec![3, 4, 5, 6]);
547 }
548
549 #[test]
550 fn visible_frontier_is_monotonically_non_increasing_down_chain() {
551 let mut relay = CascadeRelay::new("inter");
552 relay.record_self_applied(80);
553 relay.register_downstream("leaf", 0);
554 relay.record_downstream_ack("leaf", 60);
555 // The leaf can serve up to its own confirmed, never beyond the feeder.
556 assert_eq!(relay.downstream_visible_frontier("leaf"), Some(60));
557
558 // If the leaf somehow confirms beyond the feeder (shouldn't happen),
559 // the visible frontier still clamps at self_applied.
560 relay.record_downstream_ack("leaf", 200);
561 assert_eq!(relay.downstream_visible_frontier("leaf"), Some(80));
562 assert_eq!(relay.downstream_visible_frontier("unknown"), None);
563 }
564
565 #[test]
566 fn downstream_can_serve_bookmark_only_when_frontier_covers_it() {
567 let mut relay = CascadeRelay::new("inter");
568 relay.record_self_applied(100);
569 relay.register_downstream("leaf", 0);
570 relay.record_downstream_ack("leaf", 50);
571 let within = CausalBookmark::new(1, 50);
572 let beyond = CausalBookmark::new(1, 51);
573 assert!(relay.downstream_can_serve("leaf", &within));
574 assert!(!relay.downstream_can_serve("leaf", &beyond));
575 assert!(!relay.downstream_can_serve("missing", &within));
576 }
577
578 #[test]
579 fn upstream_confirmed_bookmark_stamps_term() {
580 let mut relay = CascadeRelay::new("inter");
581 relay.record_self_applied(100);
582 relay.register_downstream("leaf", 0);
583 relay.record_downstream_ack("leaf", 70);
584 let bm = relay.upstream_confirmed_bookmark(7);
585 assert_eq!(bm.term(), 7);
586 assert_eq!(bm.commit_lsn(), 70);
587 }
588}