osproxy_tenancy/placement_table.rs
1//! An in-memory, epoch-versioned placement table.
2//!
3//! Maps each partition to its current [`Placement`] and stamps every change
4//! with a fresh, monotonically increasing [`Epoch`]. The epoch is a logical
5//! generation counter (no wall-clock, keeps the table deterministic, `docs/12`)
6//! that flows onto writes so the sink can reject a stale-epoch write during a
7//! migration (`docs/06` §2).
8//!
9//! This is the M1 backend: a process-local table seeded by the operator. The
10//! fleet-wide watched store (etcd/Consul/…) arrives in M7 behind the same
11//! lookup shape (`docs/11`).
12
13use std::collections::HashMap;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::RwLock;
16
17use osproxy_core::{Epoch, PartitionId};
18use osproxy_spi::{MigrationPhase, Placement, PlacementAt};
19
20use crate::migration::{MigrationError, PartitionState, Phase, WriteAdmission};
21
22/// Maps a partition's internal state to the shape-only [`MigrationPhase`] label
23/// surfaced through [`PlacementAt`] for observability (`docs/06` §5).
24fn migration_phase(state: &PartitionState) -> MigrationPhase {
25 match state {
26 PartitionState::Active(_) => MigrationPhase::Settled,
27 PartitionState::Migrating {
28 phase: Phase::Draining,
29 ..
30 } => MigrationPhase::Draining,
31 PartitionState::Migrating {
32 phase: Phase::Cutover,
33 ..
34 } => MigrationPhase::Cutover,
35 }
36}
37
38/// One partition's migration state plus the epoch it was last stamped at. Every
39/// `set`/transition advances the epoch, so a stamped decision can be recognized
40/// as resolved against a superseded generation (`docs/06` §2).
41#[derive(Clone, Debug)]
42struct Entry {
43 state: PartitionState,
44 epoch: Epoch,
45}
46
47/// A concurrent, epoch-versioned map from partition to placement, carrying each
48/// partition's migration state machine (`docs/06`).
49///
50/// Cloneable handles are not provided here; wrap in an `Arc` to share. All
51/// methods are non-blocking beyond a short critical section. Transitions are
52/// total: an inapplicable transition returns a [`MigrationError`] and leaves the
53/// table unchanged.
54#[derive(Debug)]
55pub struct PlacementTable {
56 // A read-mostly map: lookups vastly outnumber migrations. `RwLock` lets
57 // concurrent routing reads proceed in parallel.
58 entries: RwLock<HashMap<PartitionId, Entry>>,
59 // The generation counter. Every `set`/transition pre-increments it, so the
60 // first placement gets epoch 1 and `Epoch::ZERO` always means "never placed".
61 generation: AtomicU64,
62}
63
64impl PlacementTable {
65 /// Creates an empty table at generation zero.
66 #[must_use]
67 pub fn new() -> Self {
68 Self {
69 entries: RwLock::new(HashMap::new()),
70 generation: AtomicU64::new(0),
71 }
72 }
73
74 /// Registers (or replaces) the placement for `partition` as `Active`,
75 /// stamping a fresh epoch and returning it. Initial registration; an
76 /// in-flight migration uses the phase transitions below, not `set`.
77 pub fn set(&self, partition: PartitionId, placement: Placement) -> Epoch {
78 let epoch = self.next_epoch();
79 self.write_lock().insert(
80 partition,
81 Entry::new(PartitionState::Active(placement), epoch),
82 );
83 epoch
84 }
85
86 /// Begins migrating `partition` to `to`: `Active(from)` → `Migrating`
87 /// `Draining`. Writes still go to `from`; the epoch advances.
88 ///
89 /// # Errors
90 /// [`MigrationError::AlreadyMigrating`] if a migration is already in flight,
91 /// [`MigrationError::UnknownPartition`] if the partition has no placement.
92 pub fn begin_migration(
93 &self,
94 partition: &PartitionId,
95 to: Placement,
96 ) -> Result<Epoch, MigrationError> {
97 self.transition(partition, |state| match state {
98 PartitionState::Active(from) => Ok(PartitionState::Migrating {
99 from,
100 to,
101 phase: Phase::Draining,
102 }),
103 PartitionState::Migrating { .. } => Err(MigrationError::AlreadyMigrating),
104 })
105 }
106
107 /// Enters the cutover window: `Draining` → `Cutover`. Writes are now rejected
108 /// until [`complete_migration`](Self::complete_migration) flips the pointer.
109 ///
110 /// # Errors
111 /// [`MigrationError::NotMigrating`] if settled, [`MigrationError::NotDraining`]
112 /// if already past draining.
113 pub fn enter_cutover(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
114 self.transition(partition, |state| match state {
115 PartitionState::Migrating {
116 from,
117 to,
118 phase: Phase::Draining,
119 } => Ok(PartitionState::Migrating {
120 from,
121 to,
122 phase: Phase::Cutover,
123 }),
124 PartitionState::Migrating { .. } => Err(MigrationError::NotDraining),
125 PartitionState::Active(_) => Err(MigrationError::NotMigrating),
126 })
127 }
128
129 /// Completes the migration, the pointer flip: `Cutover` → `Active(to)`.
130 ///
131 /// # Errors
132 /// [`MigrationError::NotMigrating`] if settled, [`MigrationError::NotCutover`]
133 /// if not yet in cutover.
134 pub fn complete_migration(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
135 self.transition(partition, |state| match state {
136 PartitionState::Migrating {
137 to,
138 phase: Phase::Cutover,
139 ..
140 } => Ok(PartitionState::Active(to)),
141 PartitionState::Migrating { .. } => Err(MigrationError::NotCutover),
142 PartitionState::Active(_) => Err(MigrationError::NotMigrating),
143 })
144 }
145
146 /// Aborts an in-flight migration, returning the partition to `Active(from)`.
147 /// Since writes never committed to `to` (Draining wrote to `from`, Cutover
148 /// rejected), no rollback of data is needed (INV-M3).
149 ///
150 /// # Errors
151 /// [`MigrationError::NotMigrating`] if the partition is settled.
152 pub fn abort_migration(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
153 self.transition(partition, |state| match state {
154 PartitionState::Migrating { from, .. } => Ok(PartitionState::Active(from)),
155 PartitionState::Active(_) => Err(MigrationError::NotMigrating),
156 })
157 }
158
159 /// The current migration state and the epoch it was stamped at, or `None`.
160 /// For observability and the control plane (`docs/06` §5).
161 #[must_use]
162 pub fn state(&self, partition: &PartitionId) -> Option<(PartitionState, Epoch)> {
163 self.read_lock()
164 .get(partition)
165 .map(|e| (e.state.clone(), e.epoch))
166 }
167
168 /// Resolves the placement reads go to (and its epoch), or `None`. The single
169 /// read placement, `from` until a migration completes, so a read never
170 /// sees a split view (INV-M4). The routing entry point.
171 #[must_use]
172 pub fn get(&self, partition: &PartitionId) -> Option<PlacementAt> {
173 self.read_lock().get(partition).map(|e| {
174 PlacementAt::new(e.state.read_placement().clone(), e.epoch)
175 .with_phase(migration_phase(&e.state))
176 })
177 }
178
179 /// The migration write gate (`docs/06` §2): may a write resolved at `epoch`
180 /// for `partition` commit now? [`WriteAdmission::Admit`] only if writes are
181 /// currently allowed (not in the `Cutover` window) *and* the partition's
182 /// epoch is unchanged since the decision was resolved, otherwise
183 /// [`WriteAdmission::Reject`], which the caller surfaces as a retryable
184 /// stale-epoch error so the client re-resolves and retries.
185 ///
186 /// Epoch equality is the per-partition staleness check (`epoch` only advances
187 /// on *this* partition's transitions), and the cutover gate handles the one
188 /// window where a write resolved at the *current* epoch must still be held:
189 /// together they give INV-M1 (no write in cutover) and INV-M2 (no write
190 /// against a superseded placement after the flip).
191 #[must_use]
192 pub fn admit_write(&self, partition: &PartitionId, epoch: Epoch) -> WriteAdmission {
193 let admit = self
194 .read_lock()
195 .get(partition)
196 .is_some_and(|e| e.state.write_placement().is_some() && e.epoch == epoch);
197 if admit {
198 WriteAdmission::Admit
199 } else {
200 WriteAdmission::Reject
201 }
202 }
203
204 /// The current generation of the table (the epoch the most recent change
205 /// produced, or [`Epoch::ZERO`] if empty).
206 #[must_use]
207 pub fn current_epoch(&self) -> Epoch {
208 Epoch::new(self.generation.load(Ordering::SeqCst))
209 }
210
211 /// Allocates the next monotonic epoch (generation counter pre-increment).
212 fn next_epoch(&self) -> Epoch {
213 Epoch::new(self.generation.fetch_add(1, Ordering::SeqCst) + 1)
214 }
215
216 /// Applies a state transition under the write lock: `f` maps the current
217 /// state to the next one (or a [`MigrationError`]). On success the entry is
218 /// replaced and stamped with a fresh epoch; on any error the table is
219 /// untouched (transitions are atomic and side-effect-free on failure).
220 fn transition(
221 &self,
222 partition: &PartitionId,
223 f: impl FnOnce(PartitionState) -> Result<PartitionState, MigrationError>,
224 ) -> Result<Epoch, MigrationError> {
225 let mut entries = self.write_lock();
226 let current = entries
227 .get(partition)
228 .ok_or(MigrationError::UnknownPartition)?;
229 let next = f(current.state.clone())?;
230 let epoch = self.next_epoch();
231 entries.insert(partition.clone(), Entry::new(next, epoch));
232 Ok(epoch)
233 }
234
235 /// Acquires the read lock, recovering from a poisoned lock.
236 ///
237 /// A poisoned lock means a writer panicked mid-update. The stored data is a
238 /// plain map (no broken invariant a panic could leave torn), so recovering
239 /// the guard is safe and keeps routing available, far better than
240 /// propagating a panic onto every request path (NFR-R1).
241 fn read_lock(&self) -> std::sync::RwLockReadGuard<'_, HashMap<PartitionId, Entry>> {
242 self.entries
243 .read()
244 .unwrap_or_else(std::sync::PoisonError::into_inner)
245 }
246
247 /// Acquires the write lock, recovering from a poisoned lock (see
248 /// [`PlacementTable::read_lock`]).
249 fn write_lock(&self) -> std::sync::RwLockWriteGuard<'_, HashMap<PartitionId, Entry>> {
250 self.entries
251 .write()
252 .unwrap_or_else(std::sync::PoisonError::into_inner)
253 }
254}
255
256impl Entry {
257 fn new(state: PartitionState, epoch: Epoch) -> Self {
258 Self { state, epoch }
259 }
260}
261
262impl Default for PlacementTable {
263 fn default() -> Self {
264 Self::new()
265 }
266}
267
268#[cfg(test)]
269mod tests {
270 use super::*;
271 use osproxy_core::{ClusterId, IndexName};
272
273 fn shared(cluster: &str, index: &str) -> Placement {
274 Placement::SharedIndex {
275 cluster: ClusterId::from(cluster),
276 index: IndexName::from(index),
277 inject: Vec::new(),
278 }
279 }
280
281 #[test]
282 fn missing_partition_resolves_to_none() {
283 let table = PlacementTable::new();
284 assert!(table.get(&PartitionId::from("absent")).is_none());
285 assert_eq!(table.current_epoch(), Epoch::ZERO);
286 }
287
288 #[test]
289 fn set_assigns_monotonic_epochs() {
290 let table = PlacementTable::new();
291 let e1 = table.set(PartitionId::from("a"), shared("c", "i"));
292 let e2 = table.set(PartitionId::from("b"), shared("c", "i"));
293 assert_eq!(e1, Epoch::new(1));
294 assert_eq!(e2, Epoch::new(2));
295 assert!(e2 > e1);
296 assert_eq!(table.current_epoch(), e2);
297 }
298
299 #[test]
300 fn migration_replaces_placement_and_advances_epoch() {
301 let table = PlacementTable::new();
302 let p = PartitionId::from("t");
303 table.set(p.clone(), shared("old", "i"));
304 let before = table.get(&p).unwrap();
305 assert_eq!(before.placement.cluster().as_str(), "old");
306
307 let migrated = table.set(p.clone(), shared("new", "i"));
308 let after = table.get(&p).unwrap();
309 assert_eq!(after.placement.cluster().as_str(), "new");
310 assert_eq!(after.epoch, migrated);
311 assert!(after.epoch > before.epoch);
312 }
313}