use std::collections::HashMap;
use crate::state::{Checkpoint, ResumeKey};
pub(super) fn merge_monotonic(
mut candidate: HashMap<ResumeKey, Checkpoint>,
disk_state: HashMap<ResumeKey, Checkpoint>,
deletes: &HashMap<ResumeKey, u64>,
) -> HashMap<ResumeKey, Checkpoint> {
for (k, disk_cp) in disk_state {
if let Some(&pre_seq) = deletes.get(&k) {
if disk_cp.last_committed_sequence > pre_seq {
candidate.insert(k, disk_cp);
}
continue;
}
match candidate.get(&k) {
Some(cand) if disk_cp.last_committed_sequence >= cand.last_committed_sequence => {
candidate.insert(k, disk_cp);
}
Some(_) => {}
None => {
candidate.insert(k, disk_cp);
}
}
}
candidate
}
#[cfg(test)]
#[allow(
clippy::unwrap_used,
clippy::panic,
reason = "test code: unwrap and panic on unexpected variant are the standard test diagnostics"
)]
mod tests {
use serde_json::json;
use url::Url;
use std::collections::HashMap;
use super::merge_monotonic;
use crate::state::{Checkpoint, ResumeKey};
fn key(n: u8) -> ResumeKey {
ResumeKey::new(
&Url::parse("https://a/").unwrap(),
"mars",
&json!({"n": n}),
None,
)
.unwrap()
}
fn cp(seq: u64) -> Checkpoint {
Checkpoint::new(seq, None)
}
fn map(pairs: &[(u8, u64)]) -> HashMap<ResumeKey, Checkpoint> {
pairs.iter().map(|(k, s)| (key(*k), cp(*s))).collect()
}
fn deletes(pairs: &[(u8, u64)]) -> HashMap<ResumeKey, u64> {
pairs.iter().map(|(k, s)| (key(*k), *s)).collect()
}
#[test]
fn merge_empty_disk_and_empty_candidate_is_empty() {
let result = merge_monotonic(map(&[]), map(&[]), &deletes(&[]));
assert!(result.is_empty());
}
#[test]
fn merge_empty_disk_keeps_candidate_as_is() {
let result = merge_monotonic(map(&[(1, 5)]), map(&[]), &deletes(&[]));
assert_eq!(result.get(&key(1)).unwrap().last_committed_sequence, 5);
}
#[test]
fn merge_preserves_disk_keys_unknown_to_candidate() {
let result = merge_monotonic(map(&[]), map(&[(1, 7)]), &deletes(&[]));
assert_eq!(result.get(&key(1)).unwrap().last_committed_sequence, 7);
}
#[test]
fn merge_disk_wins_when_disk_seq_greater_than_candidate_seq() {
let result = merge_monotonic(map(&[(1, 0)]), map(&[(1, 9)]), &deletes(&[]));
assert_eq!(result.get(&key(1)).unwrap().last_committed_sequence, 9);
}
#[test]
fn merge_candidate_wins_when_candidate_seq_greater_than_disk_seq() {
let result = merge_monotonic(map(&[(1, 15)]), map(&[(1, 10)]), &deletes(&[]));
assert_eq!(result.get(&key(1)).unwrap().last_committed_sequence, 15);
}
#[test]
fn merge_disk_wins_when_seqs_are_equal() {
let result = merge_monotonic(map(&[(1, 5)]), map(&[(1, 5)]), &deletes(&[]));
assert_eq!(result.get(&key(1)).unwrap().last_committed_sequence, 5);
}
#[test]
fn merge_honours_delete_when_disk_seq_matches_pre_state() {
let result = merge_monotonic(map(&[]), map(&[(1, 5)]), &deletes(&[(1, 5)]));
assert!(result.is_empty());
}
#[test]
fn merge_suppresses_delete_when_disk_advanced_past_pre_state() {
let result = merge_monotonic(map(&[]), map(&[(1, 10)]), &deletes(&[(1, 5)]));
assert_eq!(result.get(&key(1)).unwrap().last_committed_sequence, 10);
}
#[test]
fn merge_honours_delete_when_disk_has_no_key() {
let result = merge_monotonic(map(&[]), map(&[]), &deletes(&[(1, 5)]));
assert!(result.is_empty());
}
#[test]
fn merge_mixed_put_and_delete_against_concurrent_writes() {
let result = merge_monotonic(
map(&[(2, 20)]),
map(&[(1, 10), (2, 5)]),
&deletes(&[(1, 2)]),
);
assert_eq!(result.get(&key(1)).unwrap().last_committed_sequence, 10);
assert_eq!(result.get(&key(2)).unwrap().last_committed_sequence, 20);
}
}