1use std::collections::BTreeMap;
27
28use serde_json::Value as JsonValue;
29
30use super::change_record::{ChangeRecord, RangeAdmitError, RangeAuthority};
31use super::util::{get_opt_u64, get_u64, object_from_slice, Result};
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub struct RangeStreamPosition {
46 pub range_id: u64,
47 pub applied_lsn: u64,
48 pub accepted_term: u64,
49 pub accepted_epoch: u64,
50}
51
52impl RangeStreamPosition {
53 pub fn new(range_id: u64, applied_lsn: u64, accepted_term: u64, accepted_epoch: u64) -> Self {
54 Self {
55 range_id,
56 applied_lsn,
57 accepted_term,
58 accepted_epoch,
59 }
60 }
61
62 pub fn at_origin(range_id: u64) -> Self {
65 Self::new(range_id, 0, 0, 0)
66 }
67
68 pub fn authority(&self) -> RangeAuthority {
72 RangeAuthority {
73 range_id: self.range_id,
74 min_term: self.accepted_term,
75 min_ownership_epoch: self.accepted_epoch,
76 }
77 }
78
79 pub fn advance(&mut self, record: &ChangeRecord) {
85 if record.range_id != Some(self.range_id) || record.lsn <= self.applied_lsn {
86 return;
87 }
88 self.applied_lsn = record.lsn;
89 if record.term > self.accepted_term {
90 self.accepted_term = record.term;
91 }
92 if let Some(epoch) = record.ownership_epoch {
93 if epoch > self.accepted_epoch {
94 self.accepted_epoch = epoch;
95 }
96 }
97 }
98
99 pub fn encode_json(&self) -> Vec<u8> {
100 let mut obj = serde_json::Map::new();
101 obj.insert(
102 "range_id".to_string(),
103 JsonValue::Number(self.range_id.into()),
104 );
105 obj.insert(
106 "applied_lsn".to_string(),
107 JsonValue::Number(self.applied_lsn.into()),
108 );
109 obj.insert(
110 "accepted_term".to_string(),
111 JsonValue::Number(self.accepted_term.into()),
112 );
113 obj.insert(
114 "accepted_epoch".to_string(),
115 JsonValue::Number(self.accepted_epoch.into()),
116 );
117 serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
118 }
119
120 pub fn decode_json(bytes: &[u8]) -> Result<Self> {
121 let obj = object_from_slice(bytes)?;
122 Ok(Self {
123 range_id: get_u64(&obj, "range_id")?,
124 applied_lsn: get_opt_u64(&obj, "applied_lsn").unwrap_or(0),
125 accepted_term: get_opt_u64(&obj, "accepted_term").unwrap_or(0),
126 accepted_epoch: get_opt_u64(&obj, "accepted_epoch").unwrap_or(0),
127 })
128 }
129}
130
131#[derive(Debug, Clone, Copy, PartialEq, Eq)]
133pub enum RangeStreamDecision {
134 Apply,
137 SkipOtherRange,
140 SkipReplayed,
143 Reject(RangeAdmitError),
146}
147
148pub fn classify_range_record(
154 position: &RangeStreamPosition,
155 record: &ChangeRecord,
156) -> RangeStreamDecision {
157 if record.range_id != Some(position.range_id) {
158 return RangeStreamDecision::SkipOtherRange;
159 }
160 if record.lsn <= position.applied_lsn {
161 return RangeStreamDecision::SkipReplayed;
162 }
163 match position.authority().admit(record) {
164 Ok(()) => RangeStreamDecision::Apply,
165 Err(error) => RangeStreamDecision::Reject(error),
166 }
167}
168
169#[derive(Debug, Clone, Copy, PartialEq, Eq)]
173pub struct RangeStreamReject {
174 pub lsn: u64,
175 pub error: RangeAdmitError,
176}
177
178#[derive(Debug, Clone, PartialEq, Eq)]
184pub struct RangeCatchupPlan {
185 pub range_id: u64,
186 pub apply: Vec<usize>,
187 pub rejected: Vec<RangeStreamReject>,
188 pub resume: RangeStreamPosition,
189 pub scanned: usize,
190}
191
192impl RangeCatchupPlan {
193 pub fn apply_count(&self) -> usize {
195 self.apply.len()
196 }
197
198 pub fn is_empty(&self) -> bool {
200 self.apply.is_empty()
201 }
202}
203
204pub fn plan_range_catchup(
213 position: &RangeStreamPosition,
214 records: &[ChangeRecord],
215) -> RangeCatchupPlan {
216 let mut resume = *position;
217 let mut apply = Vec::new();
218 let mut rejected = Vec::new();
219 for (index, record) in records.iter().enumerate() {
220 match classify_range_record(&resume, record) {
221 RangeStreamDecision::Apply => {
222 apply.push(index);
223 resume.advance(record);
224 }
225 RangeStreamDecision::Reject(error) => rejected.push(RangeStreamReject {
226 lsn: record.lsn,
227 error,
228 }),
229 RangeStreamDecision::SkipOtherRange | RangeStreamDecision::SkipReplayed => {}
230 }
231 }
232 RangeCatchupPlan {
233 range_id: position.range_id,
234 apply,
235 rejected,
236 resume,
237 scanned: records.len(),
238 }
239}
240
241#[derive(Debug, Clone, Copy, PartialEq, Eq)]
250pub struct RangeStreamProgress {
251 pub range_id: u64,
252 pub applied_lsn: u64,
253 pub streamed_lsn: u64,
254 pub primary_lsn: u64,
255}
256
257impl RangeStreamProgress {
258 pub fn new(range_id: u64) -> Self {
259 Self {
260 range_id,
261 applied_lsn: 0,
262 streamed_lsn: 0,
263 primary_lsn: 0,
264 }
265 }
266
267 pub fn apply_lag(&self) -> u64 {
271 self.primary_lsn.saturating_sub(self.applied_lsn)
272 }
273
274 pub fn stream_lag(&self) -> u64 {
276 self.primary_lsn.saturating_sub(self.streamed_lsn)
277 }
278
279 pub fn is_caught_up(&self) -> bool {
283 self.primary_lsn > 0 && self.applied_lsn >= self.primary_lsn
284 }
285
286 pub fn failover_eligible(&self, max_lag: u64) -> bool {
290 self.primary_lsn > 0 && self.apply_lag() <= max_lag
291 }
292
293 pub fn encode_json(&self) -> Vec<u8> {
294 let mut obj = serde_json::Map::new();
295 obj.insert(
296 "range_id".to_string(),
297 JsonValue::Number(self.range_id.into()),
298 );
299 obj.insert(
300 "applied_lsn".to_string(),
301 JsonValue::Number(self.applied_lsn.into()),
302 );
303 obj.insert(
304 "streamed_lsn".to_string(),
305 JsonValue::Number(self.streamed_lsn.into()),
306 );
307 obj.insert(
308 "primary_lsn".to_string(),
309 JsonValue::Number(self.primary_lsn.into()),
310 );
311 serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
312 }
313
314 pub fn decode_json(bytes: &[u8]) -> Result<Self> {
315 let obj = object_from_slice(bytes)?;
316 Ok(Self {
317 range_id: get_u64(&obj, "range_id")?,
318 applied_lsn: get_opt_u64(&obj, "applied_lsn").unwrap_or(0),
319 streamed_lsn: get_opt_u64(&obj, "streamed_lsn").unwrap_or(0),
320 primary_lsn: get_opt_u64(&obj, "primary_lsn").unwrap_or(0),
321 })
322 }
323}
324
325#[derive(Debug, Clone, Default)]
334pub struct RangeProgressTracker {
335 ranges: BTreeMap<u64, RangeStreamProgress>,
336}
337
338impl RangeProgressTracker {
339 pub fn new() -> Self {
340 Self::default()
341 }
342
343 fn slot(&mut self, range_id: u64) -> &mut RangeStreamProgress {
344 self.ranges
345 .entry(range_id)
346 .or_insert_with(|| RangeStreamProgress::new(range_id))
347 }
348
349 pub fn index_record(&mut self, record: &ChangeRecord) {
353 let Some(range_id) = record.range_id else {
354 return;
355 };
356 let slot = self.slot(range_id);
357 if record.lsn > slot.primary_lsn {
358 slot.primary_lsn = record.lsn;
359 }
360 }
361
362 pub fn note_streamed(&mut self, range_id: u64, lsn: u64) {
366 let slot = self.slot(range_id);
367 if lsn > slot.streamed_lsn {
368 slot.streamed_lsn = lsn;
369 }
370 if lsn > slot.primary_lsn {
371 slot.primary_lsn = lsn;
372 }
373 }
374
375 pub fn note_applied(&mut self, range_id: u64, lsn: u64) {
379 let slot = self.slot(range_id);
380 if lsn > slot.applied_lsn {
381 slot.applied_lsn = lsn;
382 }
383 if lsn > slot.streamed_lsn {
384 slot.streamed_lsn = lsn;
385 }
386 if lsn > slot.primary_lsn {
387 slot.primary_lsn = lsn;
388 }
389 }
390
391 pub fn observe_position(&mut self, position: &RangeStreamPosition) {
395 self.note_applied(position.range_id, position.applied_lsn);
396 }
397
398 pub fn progress(&self, range_id: u64) -> Option<&RangeStreamProgress> {
399 self.ranges.get(&range_id)
400 }
401
402 pub fn apply_lag(&self, range_id: u64) -> Option<u64> {
404 self.ranges
405 .get(&range_id)
406 .map(RangeStreamProgress::apply_lag)
407 }
408
409 pub fn iter(&self) -> impl Iterator<Item = &RangeStreamProgress> {
411 self.ranges.values()
412 }
413
414 pub fn len(&self) -> usize {
415 self.ranges.len()
416 }
417
418 pub fn is_empty(&self) -> bool {
419 self.ranges.is_empty()
420 }
421
422 pub fn failover_eligible(&self, max_lag: u64) -> Vec<u64> {
426 self.ranges
427 .values()
428 .filter(|progress| progress.failover_eligible(max_lag))
429 .map(|progress| progress.range_id)
430 .collect()
431 }
432}
433
434#[cfg(test)]
435mod tests {
436 use super::*;
437 use crate::replication::change_record::ChangeOperation;
438
439 fn record(range_id: Option<u64>, lsn: u64, term: u64, epoch: Option<u64>) -> ChangeRecord {
440 ChangeRecord {
441 term,
442 lsn,
443 timestamp: 1,
444 operation: ChangeOperation::Insert,
445 collection: "c".to_string(),
446 entity_id: lsn,
447 entity_kind: "row".to_string(),
448 entity_bytes: Some(vec![1]),
449 metadata: None,
450 refresh_records: None,
451 range_id,
452 ownership_epoch: epoch,
453 }
454 }
455
456 #[test]
457 fn position_round_trips_on_the_json_wire() {
458 let pos = RangeStreamPosition::new(7, 42, 3, 5);
459 assert_eq!(
460 RangeStreamPosition::decode_json(&pos.encode_json()).unwrap(),
461 pos
462 );
463 }
464
465 #[test]
466 fn classify_routes_by_range_identity() {
467 let pos = RangeStreamPosition::at_origin(7);
469 assert_eq!(
471 classify_range_record(&pos, &record(Some(7), 1, 1, Some(1))),
472 RangeStreamDecision::Apply
473 );
474 assert_eq!(
476 classify_range_record(&pos, &record(Some(9), 1, 1, Some(1))),
477 RangeStreamDecision::SkipOtherRange
478 );
479 assert_eq!(
481 classify_range_record(&pos, &record(None, 1, 1, None)),
482 RangeStreamDecision::SkipOtherRange
483 );
484 }
485
486 #[test]
487 fn plan_filters_one_range_out_of_a_shared_stream() {
488 let stream = vec![
490 record(Some(7), 1, 1, Some(1)),
491 record(Some(9), 2, 1, Some(1)),
492 record(Some(7), 3, 1, Some(1)),
493 record(None, 4, 1, None), record(Some(7), 5, 1, Some(1)),
495 ];
496 let plan = plan_range_catchup(&RangeStreamPosition::at_origin(7), &stream);
497 assert_eq!(plan.apply, vec![0, 2, 4]);
499 assert!(plan.rejected.is_empty());
500 assert_eq!(plan.scanned, 5);
501 assert_eq!(plan.resume.applied_lsn, 5);
503 assert_eq!(plan.apply_count(), 3);
504 }
505
506 #[test]
507 fn plan_resumes_from_a_known_range_position() {
508 let stream = vec![
509 record(Some(7), 1, 1, Some(1)),
510 record(Some(7), 3, 1, Some(1)),
511 record(Some(7), 5, 1, Some(1)),
512 ];
513 let pos = RangeStreamPosition::new(7, 3, 1, 1);
515 let plan = plan_range_catchup(&pos, &stream);
516 assert_eq!(plan.apply, vec![2]);
518 assert_eq!(plan.resume.applied_lsn, 5);
519 }
520
521 #[test]
522 fn plan_rejects_stale_ownership_epoch_and_term() {
523 let pos = RangeStreamPosition::new(7, 0, 3, 4);
525 let stream = vec![
526 record(Some(7), 1, 3, Some(2)), record(Some(7), 2, 1, Some(9)), record(Some(7), 3, 3, Some(4)), ];
530 let plan = plan_range_catchup(&pos, &stream);
531 assert_eq!(plan.apply, vec![2]);
532 assert_eq!(
533 plan.rejected,
534 vec![
535 RangeStreamReject {
536 lsn: 1,
537 error: RangeAdmitError::StaleOwnershipEpoch {
538 record_epoch: 2,
539 accepted_epoch: 4,
540 },
541 },
542 RangeStreamReject {
543 lsn: 2,
544 error: RangeAdmitError::StaleTerm {
545 record_term: 1,
546 accepted_term: 3,
547 },
548 },
549 ]
550 );
551 assert_eq!(plan.resume.applied_lsn, 3);
553 assert_eq!(plan.resume.accepted_epoch, 4);
554 }
555
556 #[test]
557 fn position_advance_ratchets_authority_so_a_later_stale_write_is_fenced() {
558 let mut pos = RangeStreamPosition::new(7, 0, 1, 1);
559 pos.advance(&record(Some(7), 10, 4, Some(6)));
561 assert_eq!(pos.applied_lsn, 10);
562 assert_eq!(pos.accepted_term, 4);
563 assert_eq!(pos.accepted_epoch, 6);
564 assert_eq!(
566 classify_range_record(&pos, &record(Some(7), 11, 4, Some(5))),
567 RangeStreamDecision::Reject(RangeAdmitError::StaleOwnershipEpoch {
568 record_epoch: 5,
569 accepted_epoch: 6,
570 })
571 );
572 }
573
574 #[test]
575 fn progress_round_trips_and_reports_lag() {
576 let mut progress = RangeStreamProgress::new(7);
577 progress.primary_lsn = 100;
578 progress.streamed_lsn = 80;
579 progress.applied_lsn = 60;
580 assert_eq!(progress.apply_lag(), 40);
581 assert_eq!(progress.stream_lag(), 20);
582 assert!(!progress.is_caught_up());
583 assert!(progress.failover_eligible(50));
584 assert!(!progress.failover_eligible(10));
585 assert_eq!(
586 RangeStreamProgress::decode_json(&progress.encode_json()).unwrap(),
587 progress
588 );
589 }
590
591 #[test]
592 fn tracker_reports_independent_lag_for_multiple_ranges() {
593 let mut tracker = RangeProgressTracker::new();
594 for rec in [
596 record(Some(7), 1, 1, Some(1)),
597 record(Some(9), 2, 1, Some(1)),
598 record(Some(7), 3, 1, Some(1)),
599 record(Some(9), 4, 1, Some(1)),
600 record(None, 5, 1, None), record(Some(9), 6, 1, Some(1)),
602 ] {
603 tracker.index_record(&rec);
604 }
605 tracker.note_applied(7, 3);
607 tracker.note_applied(9, 2);
608
609 assert_eq!(tracker.len(), 2);
610 assert_eq!(tracker.apply_lag(7), Some(0));
612 assert!(tracker.progress(7).unwrap().is_caught_up());
613 assert_eq!(tracker.apply_lag(9), Some(4));
615 assert!(!tracker.progress(9).unwrap().is_caught_up());
616 assert_eq!(tracker.apply_lag(99), None);
618
619 assert_eq!(tracker.failover_eligible(0), vec![7]);
622 assert_eq!(tracker.failover_eligible(10), vec![7, 9]);
623 }
624
625 #[test]
626 fn tracker_frontiers_are_monotonic() {
627 let mut tracker = RangeProgressTracker::new();
628 tracker.note_applied(7, 50);
629 tracker.note_streamed(7, 10);
631 tracker.note_applied(7, 20);
632 tracker.index_record(&record(Some(7), 5, 1, Some(1)));
633 let progress = tracker.progress(7).unwrap();
634 assert_eq!(progress.applied_lsn, 50);
635 assert_eq!(progress.streamed_lsn, 50);
636 assert_eq!(progress.primary_lsn, 50);
637 }
638
639 #[test]
640 fn observe_position_adopts_follower_applied_frontier() {
641 let mut tracker = RangeProgressTracker::new();
642 tracker.index_record(&record(Some(7), 9, 1, Some(1)));
643 tracker.observe_position(&RangeStreamPosition::new(7, 7, 1, 1));
644 assert_eq!(tracker.apply_lag(7), Some(2));
645 }
646}