1use std::collections::HashMap;
21use std::sync::Mutex;
22
23#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct MigrationState {
28 pub from: String,
30 pub to: String,
32}
33
34#[derive(Debug, Default)]
38pub struct MigrationTable {
39 migrating: Mutex<HashMap<Vec<u8>, MigrationState>>,
40 migrated: Mutex<HashMap<Vec<u8>, MigrationState>>,
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum MigrationError {
46 AlreadyMigrating,
49 AlreadyMigrated,
53}
54
55impl std::fmt::Display for MigrationError {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 match self {
58 Self::AlreadyMigrating => write!(f, "migration for this prefix is already in flight"),
59 Self::AlreadyMigrated => write!(f, "prefix has already been migrated"),
60 }
61 }
62}
63
64impl std::error::Error for MigrationError {}
65
66impl MigrationTable {
67 #[must_use]
69 pub fn new() -> Self {
70 Self::default()
71 }
72
73 pub fn start(
78 &self,
79 prefix: Vec<u8>,
80 from: String,
81 to: String,
82 ) -> Result<(), MigrationError> {
83 let mut mig = self
84 .migrating
85 .lock()
86 .unwrap_or_else(std::sync::PoisonError::into_inner);
87 if mig.contains_key(&prefix) {
88 return Err(MigrationError::AlreadyMigrating);
89 }
90 let done = self
91 .migrated
92 .lock()
93 .unwrap_or_else(std::sync::PoisonError::into_inner);
94 if done.contains_key(&prefix) {
95 return Err(MigrationError::AlreadyMigrated);
96 }
97 drop(done);
98 mig.insert(prefix, MigrationState { from, to });
99 Ok(())
100 }
101
102 pub fn commit(&self, prefix: &[u8]) -> Option<MigrationState> {
106 let mut mig = self
107 .migrating
108 .lock()
109 .unwrap_or_else(std::sync::PoisonError::into_inner);
110 let entry = mig.remove(prefix)?;
111 drop(mig);
112 let mut done = self
113 .migrated
114 .lock()
115 .unwrap_or_else(std::sync::PoisonError::into_inner);
116 done.insert(prefix.to_vec(), entry.clone());
117 Some(entry)
118 }
119
120 pub fn abort(&self, prefix: &[u8]) -> Option<MigrationState> {
125 let mut mig = self
126 .migrating
127 .lock()
128 .unwrap_or_else(std::sync::PoisonError::into_inner);
129 mig.remove(prefix)
130 }
131
132 #[must_use]
137 pub fn lookup_migrating(&self, prefix: &[u8]) -> Option<MigrationState> {
138 self.migrating
139 .lock()
140 .unwrap_or_else(std::sync::PoisonError::into_inner)
141 .get(prefix)
142 .cloned()
143 }
144
145 #[must_use]
148 pub fn lookup_migrated(&self, prefix: &[u8]) -> Option<MigrationState> {
149 self.migrated
150 .lock()
151 .unwrap_or_else(std::sync::PoisonError::into_inner)
152 .get(prefix)
153 .cloned()
154 }
155
156 #[must_use]
161 pub fn match_migrating(&self, key: &[u8]) -> Option<MigrationState> {
162 let g = self
163 .migrating
164 .lock()
165 .unwrap_or_else(std::sync::PoisonError::into_inner);
166 let mut best: Option<(&Vec<u8>, &MigrationState)> = None;
170 for (p, st) in g.iter() {
171 if key.starts_with(p)
172 && best.is_none_or(|(prev, _)| p.len() > prev.len())
173 {
174 best = Some((p, st));
175 }
176 }
177 best.map(|(_, st)| st.clone())
178 }
179
180 #[must_use]
182 pub fn match_migrated(&self, key: &[u8]) -> Option<MigrationState> {
183 let g = self
184 .migrated
185 .lock()
186 .unwrap_or_else(std::sync::PoisonError::into_inner);
187 let mut best: Option<(&Vec<u8>, &MigrationState)> = None;
188 for (p, st) in g.iter() {
189 if key.starts_with(p)
190 && best.is_none_or(|(prev, _)| p.len() > prev.len())
191 {
192 best = Some((p, st));
193 }
194 }
195 best.map(|(_, st)| st.clone())
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202
203 #[test]
204 fn start_then_lookup() {
205 let t = MigrationTable::new();
206 t.start(b"app:billing:".to_vec(), "A".into(), "B".into())
207 .unwrap();
208 let st = t.lookup_migrating(b"app:billing:").unwrap();
209 assert_eq!(st.from, "A");
210 assert_eq!(st.to, "B");
211 }
212
213 #[test]
214 fn start_double_errs_already_migrating() {
215 let t = MigrationTable::new();
216 t.start(b"p:".to_vec(), "A".into(), "B".into()).unwrap();
217 let err = t.start(b"p:".to_vec(), "A".into(), "B".into()).unwrap_err();
218 assert_eq!(err, MigrationError::AlreadyMigrating);
219 }
220
221 #[test]
222 fn commit_moves_to_migrated() {
223 let t = MigrationTable::new();
224 t.start(b"p:".to_vec(), "A".into(), "B".into()).unwrap();
225 let committed = t.commit(b"p:").unwrap();
226 assert_eq!(committed.to, "B");
227 assert!(t.lookup_migrating(b"p:").is_none());
228 assert_eq!(t.lookup_migrated(b"p:").map(|s| s.to), Some("B".into()));
229 }
230
231 #[test]
232 fn abort_drops_migrating() {
233 let t = MigrationTable::new();
234 t.start(b"p:".to_vec(), "A".into(), "B".into()).unwrap();
235 t.abort(b"p:").unwrap();
236 assert!(t.lookup_migrating(b"p:").is_none());
237 assert!(t.lookup_migrated(b"p:").is_none());
238 }
239
240 #[test]
241 fn match_migrating_longest_prefix_wins() {
242 let t = MigrationTable::new();
243 t.start(b"app:".to_vec(), "A".into(), "B".into()).unwrap();
244 t.start(b"app:billing:".to_vec(), "B".into(), "C".into())
245 .unwrap();
246 let st = t.match_migrating(b"app:billing:x").unwrap();
252 assert_eq!(st.from, "B"); }
254}