use std::collections::HashMap;
use std::sync::Mutex;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MigrationState {
pub from: String,
pub to: String,
}
#[derive(Debug, Default)]
pub struct MigrationTable {
migrating: Mutex<HashMap<Vec<u8>, MigrationState>>,
migrated: Mutex<HashMap<Vec<u8>, MigrationState>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MigrationError {
AlreadyMigrating,
AlreadyMigrated,
}
impl std::fmt::Display for MigrationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::AlreadyMigrating => write!(f, "migration for this prefix is already in flight"),
Self::AlreadyMigrated => write!(f, "prefix has already been migrated"),
}
}
}
impl std::error::Error for MigrationError {}
impl MigrationTable {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn start(
&self,
prefix: Vec<u8>,
from: String,
to: String,
) -> Result<(), MigrationError> {
let mut mig = self
.migrating
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if mig.contains_key(&prefix) {
return Err(MigrationError::AlreadyMigrating);
}
let done = self
.migrated
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if done.contains_key(&prefix) {
return Err(MigrationError::AlreadyMigrated);
}
drop(done);
mig.insert(prefix, MigrationState { from, to });
Ok(())
}
pub fn commit(&self, prefix: &[u8]) -> Option<MigrationState> {
let mut mig = self
.migrating
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let entry = mig.remove(prefix)?;
drop(mig);
let mut done = self
.migrated
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
done.insert(prefix.to_vec(), entry.clone());
Some(entry)
}
pub fn abort(&self, prefix: &[u8]) -> Option<MigrationState> {
let mut mig = self
.migrating
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
mig.remove(prefix)
}
#[must_use]
pub fn lookup_migrating(&self, prefix: &[u8]) -> Option<MigrationState> {
self.migrating
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.get(prefix)
.cloned()
}
#[must_use]
pub fn lookup_migrated(&self, prefix: &[u8]) -> Option<MigrationState> {
self.migrated
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.get(prefix)
.cloned()
}
#[must_use]
pub fn match_migrating(&self, key: &[u8]) -> Option<MigrationState> {
let g = self
.migrating
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let mut best: Option<(&Vec<u8>, &MigrationState)> = None;
for (p, st) in g.iter() {
if key.starts_with(p)
&& best.is_none_or(|(prev, _)| p.len() > prev.len())
{
best = Some((p, st));
}
}
best.map(|(_, st)| st.clone())
}
#[must_use]
pub fn match_migrated(&self, key: &[u8]) -> Option<MigrationState> {
let g = self
.migrated
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let mut best: Option<(&Vec<u8>, &MigrationState)> = None;
for (p, st) in g.iter() {
if key.starts_with(p)
&& best.is_none_or(|(prev, _)| p.len() > prev.len())
{
best = Some((p, st));
}
}
best.map(|(_, st)| st.clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn start_then_lookup() {
let t = MigrationTable::new();
t.start(b"app:billing:".to_vec(), "A".into(), "B".into())
.unwrap();
let st = t.lookup_migrating(b"app:billing:").unwrap();
assert_eq!(st.from, "A");
assert_eq!(st.to, "B");
}
#[test]
fn start_double_errs_already_migrating() {
let t = MigrationTable::new();
t.start(b"p:".to_vec(), "A".into(), "B".into()).unwrap();
let err = t.start(b"p:".to_vec(), "A".into(), "B".into()).unwrap_err();
assert_eq!(err, MigrationError::AlreadyMigrating);
}
#[test]
fn commit_moves_to_migrated() {
let t = MigrationTable::new();
t.start(b"p:".to_vec(), "A".into(), "B".into()).unwrap();
let committed = t.commit(b"p:").unwrap();
assert_eq!(committed.to, "B");
assert!(t.lookup_migrating(b"p:").is_none());
assert_eq!(t.lookup_migrated(b"p:").map(|s| s.to), Some("B".into()));
}
#[test]
fn abort_drops_migrating() {
let t = MigrationTable::new();
t.start(b"p:".to_vec(), "A".into(), "B".into()).unwrap();
t.abort(b"p:").unwrap();
assert!(t.lookup_migrating(b"p:").is_none());
assert!(t.lookup_migrated(b"p:").is_none());
}
#[test]
fn match_migrating_longest_prefix_wins() {
let t = MigrationTable::new();
t.start(b"app:".to_vec(), "A".into(), "B".into()).unwrap();
t.start(b"app:billing:".to_vec(), "B".into(), "C".into())
.unwrap();
let st = t.match_migrating(b"app:billing:x").unwrap();
assert_eq!(st.from, "B"); }
}