Skip to main content

reddb_wire/replication/
range_stream.rs

1//! Range-indexed WAL streaming and per-range catch-up (issue #992).
2//!
3//! The physical WAL stays a single sequential append log — there is no
4//! per-range physical file. Range replication is a *filtered view* over that
5//! one log: every derived [`ChangeRecord`] carries its range identity, owning
6//! term, and ownership epoch (issue #991), so a data member can index and route
7//! records by range without splitting the log.
8//!
9//! This module supplies the transport-agnostic primitives a range replica or a
10//! move-range target uses to ride that filtered view:
11//!
12//! - [`RangeStreamPosition`] — the per-range resume point and authority
13//!   watermark a follower persists. Catch-up restarts from this position rather
14//!   than from the global stream head.
15//! - [`classify_range_record`] / [`plan_range_catchup`] — gate the shared
16//!   stream down to exactly the records a range should apply: only records
17//!   stamped for the range, only those past the resume LSN, only those whose
18//!   term and ownership epoch clear the range authority (stale owners fenced).
19//! - [`RangeStreamProgress`] / [`RangeProgressTracker`] — independent lag and
20//!   progress per range over the one physical stream, enough to reason about
21//!   per-range failover eligibility later (issue #987 parent).
22//!
23//! The actual on-disk apply (the LSN state machine, payload hashing) stays in
24//! `reddb-server`; this crate only describes the routing/gating contract.
25
26use std::collections::BTreeMap;
27
28use serde_json::Value as JsonValue;
29
30use super::change_record::{ChangeRecord, RangeAdmitError, RangeAuthority};
31use super::util::{get_opt_u64, get_u64, object_from_slice, Result};
32
33/// The per-range resume position and authority watermark a range follower
34/// persists. A range replica restarting catch-up hands the primary this
35/// position so streaming resumes from `applied_lsn` for the range instead of
36/// replaying the whole shared WAL, and so the follower keeps fencing records
37/// from a deposed owner.
38///
39/// `applied_lsn` is a *global* WAL LSN — the highest LSN this follower has
40/// applied **for this range**. Because the range's records are sparse within
41/// the shared sequential log, range catch-up admits any record with a strictly
42/// greater LSN (range-local monotonicity) rather than requiring the global
43/// `lsn == last + 1` contiguity the whole-stream applier enforces.
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub struct RangeStreamPosition {
46    pub range_id: u64,
47    pub applied_lsn: u64,
48    pub accepted_term: u64,
49    pub accepted_epoch: u64,
50}
51
52impl RangeStreamPosition {
53    pub fn new(range_id: u64, applied_lsn: u64, accepted_term: u64, accepted_epoch: u64) -> Self {
54        Self {
55            range_id,
56            applied_lsn,
57            accepted_term,
58            accepted_epoch,
59        }
60    }
61
62    /// A fresh follower for `range_id` that has applied nothing yet and holds
63    /// the lowest possible authority watermark (accepts any term/epoch).
64    pub fn at_origin(range_id: u64) -> Self {
65        Self::new(range_id, 0, 0, 0)
66    }
67
68    /// The authority fence this position currently enforces. A record stamped
69    /// for this range whose term or ownership epoch is below the watermark is a
70    /// write from a stale timeline or deposed owner and is rejected.
71    pub fn authority(&self) -> RangeAuthority {
72        RangeAuthority {
73            range_id: self.range_id,
74            min_term: self.accepted_term,
75            min_ownership_epoch: self.accepted_epoch,
76        }
77    }
78
79    /// Advance this position past an admitted record that belongs to the range.
80    /// The resume LSN moves forward and the authority watermark ratchets up to
81    /// the record's term/epoch so a later stale write cannot slip back in. Only
82    /// records stamped for this range and ahead of the current resume LSN move
83    /// the position; everything else leaves it untouched.
84    pub fn advance(&mut self, record: &ChangeRecord) {
85        if record.range_id != Some(self.range_id) || record.lsn <= self.applied_lsn {
86            return;
87        }
88        self.applied_lsn = record.lsn;
89        if record.term > self.accepted_term {
90            self.accepted_term = record.term;
91        }
92        if let Some(epoch) = record.ownership_epoch {
93            if epoch > self.accepted_epoch {
94                self.accepted_epoch = epoch;
95            }
96        }
97    }
98
99    pub fn encode_json(&self) -> Vec<u8> {
100        let mut obj = serde_json::Map::new();
101        obj.insert(
102            "range_id".to_string(),
103            JsonValue::Number(self.range_id.into()),
104        );
105        obj.insert(
106            "applied_lsn".to_string(),
107            JsonValue::Number(self.applied_lsn.into()),
108        );
109        obj.insert(
110            "accepted_term".to_string(),
111            JsonValue::Number(self.accepted_term.into()),
112        );
113        obj.insert(
114            "accepted_epoch".to_string(),
115            JsonValue::Number(self.accepted_epoch.into()),
116        );
117        serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
118    }
119
120    pub fn decode_json(bytes: &[u8]) -> Result<Self> {
121        let obj = object_from_slice(bytes)?;
122        Ok(Self {
123            range_id: get_u64(&obj, "range_id")?,
124            applied_lsn: get_opt_u64(&obj, "applied_lsn").unwrap_or(0),
125            accepted_term: get_opt_u64(&obj, "accepted_term").unwrap_or(0),
126            accepted_epoch: get_opt_u64(&obj, "accepted_epoch").unwrap_or(0),
127        })
128    }
129}
130
131/// Why a single record was routed the way it was during range catch-up.
132#[derive(Debug, Clone, Copy, PartialEq, Eq)]
133pub enum RangeStreamDecision {
134    /// Stamped for this range, ahead of the resume LSN, and clears the
135    /// authority fence — apply it.
136    Apply,
137    /// Belongs to a different range (or carries no range identity at all) — not
138    /// this follower's record, skip without touching its position.
139    SkipOtherRange,
140    /// Stamped for this range but at or below the resume LSN — already applied,
141    /// skip idempotently.
142    SkipReplayed,
143    /// Stamped for this range but fenced: a stale term or ownership epoch from a
144    /// deposed owner / superseded timeline.
145    Reject(RangeAdmitError),
146}
147
148/// Route a single record relative to a range's resume position. Routing order:
149/// other-range first (cheapest, most common), then already-applied, then the
150/// authority fence — so a record we have already applied is skipped without
151/// being treated as a fence violation, while a fresh stale-owner write is
152/// rejected.
153pub fn classify_range_record(
154    position: &RangeStreamPosition,
155    record: &ChangeRecord,
156) -> RangeStreamDecision {
157    if record.range_id != Some(position.range_id) {
158        return RangeStreamDecision::SkipOtherRange;
159    }
160    if record.lsn <= position.applied_lsn {
161        return RangeStreamDecision::SkipReplayed;
162    }
163    match position.authority().admit(record) {
164        Ok(()) => RangeStreamDecision::Apply,
165        Err(error) => RangeStreamDecision::Reject(error),
166    }
167}
168
169/// A record that range catch-up refused, with the reason. Surfaced (rather than
170/// silently dropped) so the caller can count fenced records and tell a stale
171/// owner apart from a quiet range.
172#[derive(Debug, Clone, Copy, PartialEq, Eq)]
173pub struct RangeStreamReject {
174    pub lsn: u64,
175    pub error: RangeAdmitError,
176}
177
178/// The result of filtering a slice of the shared logical stream down to one
179/// range's catch-up work. `apply` holds indices into the input slice in
180/// ascending LSN order; `resume` is the position advanced past every applied
181/// record (persist it to make catch-up resumable); `rejected` lists the fenced
182/// records.
183#[derive(Debug, Clone, PartialEq, Eq)]
184pub struct RangeCatchupPlan {
185    pub range_id: u64,
186    pub apply: Vec<usize>,
187    pub rejected: Vec<RangeStreamReject>,
188    pub resume: RangeStreamPosition,
189    pub scanned: usize,
190}
191
192impl RangeCatchupPlan {
193    /// Number of records selected for apply.
194    pub fn apply_count(&self) -> usize {
195        self.apply.len()
196    }
197
198    /// Whether anything was selected to apply.
199    pub fn is_empty(&self) -> bool {
200        self.apply.is_empty()
201    }
202}
203
204/// Filter a slice of the shared logical stream (ascending by LSN) into the
205/// catch-up plan for a single range, resuming from `position`. Only records
206/// stamped for `position.range_id`, past its resume LSN, and clearing its
207/// authority fence are selected; the returned `resume` position has been
208/// advanced past them so a follower can persist it and continue.
209///
210/// The input is assumed LSN-ascending (the WAL is a sequential append log);
211/// records are visited in order so the resume position ratchets monotonically.
212pub fn plan_range_catchup(
213    position: &RangeStreamPosition,
214    records: &[ChangeRecord],
215) -> RangeCatchupPlan {
216    let mut resume = *position;
217    let mut apply = Vec::new();
218    let mut rejected = Vec::new();
219    for (index, record) in records.iter().enumerate() {
220        match classify_range_record(&resume, record) {
221            RangeStreamDecision::Apply => {
222                apply.push(index);
223                resume.advance(record);
224            }
225            RangeStreamDecision::Reject(error) => rejected.push(RangeStreamReject {
226                lsn: record.lsn,
227                error,
228            }),
229            RangeStreamDecision::SkipOtherRange | RangeStreamDecision::SkipReplayed => {}
230        }
231    }
232    RangeCatchupPlan {
233        range_id: position.range_id,
234        apply,
235        rejected,
236        resume,
237        scanned: records.len(),
238    }
239}
240
241/// Independent streaming progress for one range over the shared physical WAL.
242///
243/// All three LSNs are global WAL LSNs scoped to this range's records:
244/// `primary_lsn` is the highest the primary has produced for the range,
245/// `streamed_lsn` the highest shipped to the follower, `applied_lsn` the
246/// highest the follower has durably applied. Their gaps give per-range lag that
247/// is independent of every other range riding the same WAL — the basis for
248/// per-range failover eligibility (issue #987).
249#[derive(Debug, Clone, Copy, PartialEq, Eq)]
250pub struct RangeStreamProgress {
251    pub range_id: u64,
252    pub applied_lsn: u64,
253    pub streamed_lsn: u64,
254    pub primary_lsn: u64,
255}
256
257impl RangeStreamProgress {
258    pub fn new(range_id: u64) -> Self {
259        Self {
260            range_id,
261            applied_lsn: 0,
262            streamed_lsn: 0,
263            primary_lsn: 0,
264        }
265    }
266
267    /// Records this range still has to apply to match the primary frontier.
268    /// Saturating so a follower transiently ahead of an observed frontier
269    /// reports zero rather than underflowing.
270    pub fn apply_lag(&self) -> u64 {
271        self.primary_lsn.saturating_sub(self.applied_lsn)
272    }
273
274    /// Records produced for this range that have not yet been shipped.
275    pub fn stream_lag(&self) -> u64 {
276        self.primary_lsn.saturating_sub(self.streamed_lsn)
277    }
278
279    /// Whether the follower has applied everything the primary has produced for
280    /// the range. False until a primary frontier has actually been observed, so
281    /// an unknown range is never reported as caught up.
282    pub fn is_caught_up(&self) -> bool {
283        self.primary_lsn > 0 && self.applied_lsn >= self.primary_lsn
284    }
285
286    /// Whether this range is within `max_lag` records of the primary — a
287    /// per-range gate a later failover decision can consult. Requires an
288    /// observed primary frontier.
289    pub fn failover_eligible(&self, max_lag: u64) -> bool {
290        self.primary_lsn > 0 && self.apply_lag() <= max_lag
291    }
292
293    pub fn encode_json(&self) -> Vec<u8> {
294        let mut obj = serde_json::Map::new();
295        obj.insert(
296            "range_id".to_string(),
297            JsonValue::Number(self.range_id.into()),
298        );
299        obj.insert(
300            "applied_lsn".to_string(),
301            JsonValue::Number(self.applied_lsn.into()),
302        );
303        obj.insert(
304            "streamed_lsn".to_string(),
305            JsonValue::Number(self.streamed_lsn.into()),
306        );
307        obj.insert(
308            "primary_lsn".to_string(),
309            JsonValue::Number(self.primary_lsn.into()),
310        );
311        serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
312    }
313
314    pub fn decode_json(bytes: &[u8]) -> Result<Self> {
315        let obj = object_from_slice(bytes)?;
316        Ok(Self {
317            range_id: get_u64(&obj, "range_id")?,
318            applied_lsn: get_opt_u64(&obj, "applied_lsn").unwrap_or(0),
319            streamed_lsn: get_opt_u64(&obj, "streamed_lsn").unwrap_or(0),
320            primary_lsn: get_opt_u64(&obj, "primary_lsn").unwrap_or(0),
321        })
322    }
323}
324
325/// Tracks streaming progress for many ranges over the one physical WAL. A data
326/// member feeds every derived record through [`index_record`](Self::index_record)
327/// to maintain each range's primary frontier without splitting the log, and
328/// notes shipped/applied LSNs per range. Every range advances independently, so
329/// one lagging range does not skew another's lag or failover eligibility.
330///
331/// All updates are monotonic (`max`): out-of-order or replayed observations
332/// never move a frontier backward.
333#[derive(Debug, Clone, Default)]
334pub struct RangeProgressTracker {
335    ranges: BTreeMap<u64, RangeStreamProgress>,
336}
337
338impl RangeProgressTracker {
339    pub fn new() -> Self {
340        Self::default()
341    }
342
343    fn slot(&mut self, range_id: u64) -> &mut RangeStreamProgress {
344        self.ranges
345            .entry(range_id)
346            .or_insert_with(|| RangeStreamProgress::new(range_id))
347    }
348
349    /// Index one derived record by its range identity, bumping that range's
350    /// primary frontier. Records that carry no range identity (legacy /
351    /// non-range-replicated) are ignored — they belong to no range's stream.
352    pub fn index_record(&mut self, record: &ChangeRecord) {
353        let Some(range_id) = record.range_id else {
354            return;
355        };
356        let slot = self.slot(range_id);
357        if record.lsn > slot.primary_lsn {
358            slot.primary_lsn = record.lsn;
359        }
360    }
361
362    /// Note that records for `range_id` up to `lsn` have been shipped to the
363    /// follower. Also raises the primary frontier if it lagged behind, since you
364    /// cannot stream past what was produced.
365    pub fn note_streamed(&mut self, range_id: u64, lsn: u64) {
366        let slot = self.slot(range_id);
367        if lsn > slot.streamed_lsn {
368            slot.streamed_lsn = lsn;
369        }
370        if lsn > slot.primary_lsn {
371            slot.primary_lsn = lsn;
372        }
373    }
374
375    /// Note that the follower has durably applied records for `range_id` up to
376    /// `lsn`. Raises the streamed and primary frontiers if they lagged, since an
377    /// applied record was necessarily streamed and produced.
378    pub fn note_applied(&mut self, range_id: u64, lsn: u64) {
379        let slot = self.slot(range_id);
380        if lsn > slot.applied_lsn {
381            slot.applied_lsn = lsn;
382        }
383        if lsn > slot.streamed_lsn {
384            slot.streamed_lsn = lsn;
385        }
386        if lsn > slot.primary_lsn {
387            slot.primary_lsn = lsn;
388        }
389    }
390
391    /// Adopt a follower's reported position for `range_id` as the applied
392    /// frontier — the inbound counterpart to [`note_applied`](Self::note_applied)
393    /// when a follower acks with a [`RangeStreamPosition`].
394    pub fn observe_position(&mut self, position: &RangeStreamPosition) {
395        self.note_applied(position.range_id, position.applied_lsn);
396    }
397
398    pub fn progress(&self, range_id: u64) -> Option<&RangeStreamProgress> {
399        self.ranges.get(&range_id)
400    }
401
402    /// Apply lag for one range, or `None` if the range is unknown.
403    pub fn apply_lag(&self, range_id: u64) -> Option<u64> {
404        self.ranges
405            .get(&range_id)
406            .map(RangeStreamProgress::apply_lag)
407    }
408
409    /// Iterate every tracked range's progress, ascending by range id.
410    pub fn iter(&self) -> impl Iterator<Item = &RangeStreamProgress> {
411        self.ranges.values()
412    }
413
414    pub fn len(&self) -> usize {
415        self.ranges.len()
416    }
417
418    pub fn is_empty(&self) -> bool {
419        self.ranges.is_empty()
420    }
421
422    /// The ranges currently within `max_lag` of their primary frontier —
423    /// candidates a later per-range failover decision may promote. Ascending by
424    /// range id.
425    pub fn failover_eligible(&self, max_lag: u64) -> Vec<u64> {
426        self.ranges
427            .values()
428            .filter(|progress| progress.failover_eligible(max_lag))
429            .map(|progress| progress.range_id)
430            .collect()
431    }
432}
433
434#[cfg(test)]
435mod tests {
436    use super::*;
437    use crate::replication::change_record::ChangeOperation;
438
439    fn record(range_id: Option<u64>, lsn: u64, term: u64, epoch: Option<u64>) -> ChangeRecord {
440        ChangeRecord {
441            term,
442            lsn,
443            timestamp: 1,
444            operation: ChangeOperation::Insert,
445            collection: "c".to_string(),
446            entity_id: lsn,
447            entity_kind: "row".to_string(),
448            entity_bytes: Some(vec![1]),
449            metadata: None,
450            refresh_records: None,
451            range_id,
452            ownership_epoch: epoch,
453        }
454    }
455
456    #[test]
457    fn position_round_trips_on_the_json_wire() {
458        let pos = RangeStreamPosition::new(7, 42, 3, 5);
459        assert_eq!(
460            RangeStreamPosition::decode_json(&pos.encode_json()).unwrap(),
461            pos
462        );
463    }
464
465    #[test]
466    fn classify_routes_by_range_identity() {
467        // Resume at origin for range 7.
468        let pos = RangeStreamPosition::at_origin(7);
469        // A record for range 7 is applied.
470        assert_eq!(
471            classify_range_record(&pos, &record(Some(7), 1, 1, Some(1))),
472            RangeStreamDecision::Apply
473        );
474        // A record for a different range is not this follower's business.
475        assert_eq!(
476            classify_range_record(&pos, &record(Some(9), 1, 1, Some(1))),
477            RangeStreamDecision::SkipOtherRange
478        );
479        // A legacy record with no range identity is skipped, not applied.
480        assert_eq!(
481            classify_range_record(&pos, &record(None, 1, 1, None)),
482            RangeStreamDecision::SkipOtherRange
483        );
484    }
485
486    #[test]
487    fn plan_filters_one_range_out_of_a_shared_stream() {
488        // A single physical WAL slice interleaving two ranges.
489        let stream = vec![
490            record(Some(7), 1, 1, Some(1)),
491            record(Some(9), 2, 1, Some(1)),
492            record(Some(7), 3, 1, Some(1)),
493            record(None, 4, 1, None), // legacy / non-range
494            record(Some(7), 5, 1, Some(1)),
495        ];
496        let plan = plan_range_catchup(&RangeStreamPosition::at_origin(7), &stream);
497        // Only range 7's records, in LSN order.
498        assert_eq!(plan.apply, vec![0, 2, 4]);
499        assert!(plan.rejected.is_empty());
500        assert_eq!(plan.scanned, 5);
501        // Resume position advanced to range 7's highest applied LSN.
502        assert_eq!(plan.resume.applied_lsn, 5);
503        assert_eq!(plan.apply_count(), 3);
504    }
505
506    #[test]
507    fn plan_resumes_from_a_known_range_position() {
508        let stream = vec![
509            record(Some(7), 1, 1, Some(1)),
510            record(Some(7), 3, 1, Some(1)),
511            record(Some(7), 5, 1, Some(1)),
512        ];
513        // Already applied through LSN 3 for range 7.
514        let pos = RangeStreamPosition::new(7, 3, 1, 1);
515        let plan = plan_range_catchup(&pos, &stream);
516        // LSN 1 and 3 are replayed-skipped; only 5 applies.
517        assert_eq!(plan.apply, vec![2]);
518        assert_eq!(plan.resume.applied_lsn, 5);
519    }
520
521    #[test]
522    fn plan_rejects_stale_ownership_epoch_and_term() {
523        // Follower accepts term >= 3, epoch >= 4 for range 7.
524        let pos = RangeStreamPosition::new(7, 0, 3, 4);
525        let stream = vec![
526            record(Some(7), 1, 3, Some(2)), // stale epoch
527            record(Some(7), 2, 1, Some(9)), // stale term (checked first)
528            record(Some(7), 3, 3, Some(4)), // current — admitted
529        ];
530        let plan = plan_range_catchup(&pos, &stream);
531        assert_eq!(plan.apply, vec![2]);
532        assert_eq!(
533            plan.rejected,
534            vec![
535                RangeStreamReject {
536                    lsn: 1,
537                    error: RangeAdmitError::StaleOwnershipEpoch {
538                        record_epoch: 2,
539                        accepted_epoch: 4,
540                    },
541                },
542                RangeStreamReject {
543                    lsn: 2,
544                    error: RangeAdmitError::StaleTerm {
545                        record_term: 1,
546                        accepted_term: 3,
547                    },
548                },
549            ]
550        );
551        // The rejected stale writes never moved the resume position.
552        assert_eq!(plan.resume.applied_lsn, 3);
553        assert_eq!(plan.resume.accepted_epoch, 4);
554    }
555
556    #[test]
557    fn position_advance_ratchets_authority_so_a_later_stale_write_is_fenced() {
558        let mut pos = RangeStreamPosition::new(7, 0, 1, 1);
559        // Apply a record that lifts the range to term 4, epoch 6.
560        pos.advance(&record(Some(7), 10, 4, Some(6)));
561        assert_eq!(pos.applied_lsn, 10);
562        assert_eq!(pos.accepted_term, 4);
563        assert_eq!(pos.accepted_epoch, 6);
564        // A returning ex-owner at the old epoch is now fenced.
565        assert_eq!(
566            classify_range_record(&pos, &record(Some(7), 11, 4, Some(5))),
567            RangeStreamDecision::Reject(RangeAdmitError::StaleOwnershipEpoch {
568                record_epoch: 5,
569                accepted_epoch: 6,
570            })
571        );
572    }
573
574    #[test]
575    fn progress_round_trips_and_reports_lag() {
576        let mut progress = RangeStreamProgress::new(7);
577        progress.primary_lsn = 100;
578        progress.streamed_lsn = 80;
579        progress.applied_lsn = 60;
580        assert_eq!(progress.apply_lag(), 40);
581        assert_eq!(progress.stream_lag(), 20);
582        assert!(!progress.is_caught_up());
583        assert!(progress.failover_eligible(50));
584        assert!(!progress.failover_eligible(10));
585        assert_eq!(
586            RangeStreamProgress::decode_json(&progress.encode_json()).unwrap(),
587            progress
588        );
589    }
590
591    #[test]
592    fn tracker_reports_independent_lag_for_multiple_ranges() {
593        let mut tracker = RangeProgressTracker::new();
594        // Index a shared WAL slice spanning ranges 7 and 9.
595        for rec in [
596            record(Some(7), 1, 1, Some(1)),
597            record(Some(9), 2, 1, Some(1)),
598            record(Some(7), 3, 1, Some(1)),
599            record(Some(9), 4, 1, Some(1)),
600            record(None, 5, 1, None), // ignored by the index
601            record(Some(9), 6, 1, Some(1)),
602        ] {
603            tracker.index_record(&rec);
604        }
605        // Range 7 has fully applied; range 9 lags behind.
606        tracker.note_applied(7, 3);
607        tracker.note_applied(9, 2);
608
609        assert_eq!(tracker.len(), 2);
610        // Range 7: primary frontier 3, applied 3 → caught up, zero lag.
611        assert_eq!(tracker.apply_lag(7), Some(0));
612        assert!(tracker.progress(7).unwrap().is_caught_up());
613        // Range 9: primary frontier 6, applied 2 → lag 4, independent of range 7.
614        assert_eq!(tracker.apply_lag(9), Some(4));
615        assert!(!tracker.progress(9).unwrap().is_caught_up());
616        // The legacy record minted no range slot.
617        assert_eq!(tracker.apply_lag(99), None);
618
619        // Only the caught-up range is failover-eligible at a tight bound; the
620        // lagging range joins once the bound widens.
621        assert_eq!(tracker.failover_eligible(0), vec![7]);
622        assert_eq!(tracker.failover_eligible(10), vec![7, 9]);
623    }
624
625    #[test]
626    fn tracker_frontiers_are_monotonic() {
627        let mut tracker = RangeProgressTracker::new();
628        tracker.note_applied(7, 50);
629        // A stale, out-of-order observation must not move anything backward.
630        tracker.note_streamed(7, 10);
631        tracker.note_applied(7, 20);
632        tracker.index_record(&record(Some(7), 5, 1, Some(1)));
633        let progress = tracker.progress(7).unwrap();
634        assert_eq!(progress.applied_lsn, 50);
635        assert_eq!(progress.streamed_lsn, 50);
636        assert_eq!(progress.primary_lsn, 50);
637    }
638
639    #[test]
640    fn observe_position_adopts_follower_applied_frontier() {
641        let mut tracker = RangeProgressTracker::new();
642        tracker.index_record(&record(Some(7), 9, 1, Some(1)));
643        tracker.observe_position(&RangeStreamPosition::new(7, 7, 1, 1));
644        assert_eq!(tracker.apply_lag(7), Some(2));
645    }
646}