Skip to main content

osproxy_tenancy/
placement_table.rs

1//! An in-memory, epoch-versioned placement table.
2//!
3//! Maps each partition to its current [`Placement`] and stamps every change
4//! with a fresh, monotonically increasing [`Epoch`]. The epoch is a logical
5//! generation counter (no wall-clock, keeps the table deterministic, `docs/12`)
6//! that flows onto writes so the sink can reject a stale-epoch write during a
7//! migration (`docs/06` §2).
8//!
9//! This is the M1 backend: a process-local table seeded by the operator. The
10//! fleet-wide watched store (etcd/Consul/…) arrives in M7 behind the same
11//! lookup shape (`docs/11`).
12
13use std::collections::HashMap;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::RwLock;
16
17use osproxy_core::{Epoch, PartitionId};
18use osproxy_spi::{MigrationPhase, Placement, PlacementAt};
19
20use crate::migration::{MigrationError, PartitionState, Phase, WriteAdmission};
21
22/// Maps a partition's internal state to the shape-only [`MigrationPhase`] label
23/// surfaced through [`PlacementAt`] for observability (`docs/06` §5).
24fn migration_phase(state: &PartitionState) -> MigrationPhase {
25    match state {
26        PartitionState::Active(_) => MigrationPhase::Settled,
27        PartitionState::Migrating {
28            phase: Phase::Draining,
29            ..
30        } => MigrationPhase::Draining,
31        PartitionState::Migrating {
32            phase: Phase::Cutover,
33            ..
34        } => MigrationPhase::Cutover,
35    }
36}
37
38/// One partition's migration state plus the epoch it was last stamped at. Every
39/// `set`/transition advances the epoch, so a stamped decision can be recognized
40/// as resolved against a superseded generation (`docs/06` §2).
41#[derive(Clone, Debug)]
42struct Entry {
43    state: PartitionState,
44    epoch: Epoch,
45}
46
47/// A concurrent, epoch-versioned map from partition to placement, carrying each
48/// partition's migration state machine (`docs/06`).
49///
50/// Cloneable handles are not provided here; wrap in an `Arc` to share. All
51/// methods are non-blocking beyond a short critical section. Transitions are
52/// total: an inapplicable transition returns a [`MigrationError`] and leaves the
53/// table unchanged.
54#[derive(Debug)]
55pub struct PlacementTable {
56    // A read-mostly map: lookups vastly outnumber migrations. `RwLock` lets
57    // concurrent routing reads proceed in parallel.
58    entries: RwLock<HashMap<PartitionId, Entry>>,
59    // The generation counter. Every `set`/transition pre-increments it, so the
60    // first placement gets epoch 1 and `Epoch::ZERO` always means "never placed".
61    generation: AtomicU64,
62}
63
64impl PlacementTable {
65    /// Creates an empty table at generation zero.
66    #[must_use]
67    pub fn new() -> Self {
68        Self {
69            entries: RwLock::new(HashMap::new()),
70            generation: AtomicU64::new(0),
71        }
72    }
73
74    /// Registers (or replaces) the placement for `partition` as `Active`,
75    /// stamping a fresh epoch and returning it. Initial registration; an
76    /// in-flight migration uses the phase transitions below, not `set`.
77    pub fn set(&self, partition: PartitionId, placement: Placement) -> Epoch {
78        let epoch = self.next_epoch();
79        self.write_lock().insert(
80            partition,
81            Entry::new(PartitionState::Active(placement), epoch),
82        );
83        epoch
84    }
85
86    /// Begins migrating `partition` to `to`: `Active(from)` → `Migrating`
87    /// `Draining`. Writes still go to `from`; the epoch advances.
88    ///
89    /// # Errors
90    /// [`MigrationError::AlreadyMigrating`] if a migration is already in flight,
91    /// [`MigrationError::UnknownPartition`] if the partition has no placement.
92    pub fn begin_migration(
93        &self,
94        partition: &PartitionId,
95        to: Placement,
96    ) -> Result<Epoch, MigrationError> {
97        self.transition(partition, |state| match state {
98            PartitionState::Active(from) => Ok(PartitionState::Migrating {
99                from,
100                to,
101                phase: Phase::Draining,
102            }),
103            PartitionState::Migrating { .. } => Err(MigrationError::AlreadyMigrating),
104        })
105    }
106
107    /// Enters the cutover window: `Draining` → `Cutover`. Writes are now rejected
108    /// until [`complete_migration`](Self::complete_migration) flips the pointer.
109    ///
110    /// # Errors
111    /// [`MigrationError::NotMigrating`] if settled, [`MigrationError::NotDraining`]
112    /// if already past draining.
113    pub fn enter_cutover(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
114        self.transition(partition, |state| match state {
115            PartitionState::Migrating {
116                from,
117                to,
118                phase: Phase::Draining,
119            } => Ok(PartitionState::Migrating {
120                from,
121                to,
122                phase: Phase::Cutover,
123            }),
124            PartitionState::Migrating { .. } => Err(MigrationError::NotDraining),
125            PartitionState::Active(_) => Err(MigrationError::NotMigrating),
126        })
127    }
128
129    /// Completes the migration, the pointer flip: `Cutover` → `Active(to)`.
130    ///
131    /// # Errors
132    /// [`MigrationError::NotMigrating`] if settled, [`MigrationError::NotCutover`]
133    /// if not yet in cutover.
134    pub fn complete_migration(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
135        self.transition(partition, |state| match state {
136            PartitionState::Migrating {
137                to,
138                phase: Phase::Cutover,
139                ..
140            } => Ok(PartitionState::Active(to)),
141            PartitionState::Migrating { .. } => Err(MigrationError::NotCutover),
142            PartitionState::Active(_) => Err(MigrationError::NotMigrating),
143        })
144    }
145
146    /// Aborts an in-flight migration, returning the partition to `Active(from)`.
147    /// Since writes never committed to `to` (Draining wrote to `from`, Cutover
148    /// rejected), no rollback of data is needed (INV-M3).
149    ///
150    /// # Errors
151    /// [`MigrationError::NotMigrating`] if the partition is settled.
152    pub fn abort_migration(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
153        self.transition(partition, |state| match state {
154            PartitionState::Migrating { from, .. } => Ok(PartitionState::Active(from)),
155            PartitionState::Active(_) => Err(MigrationError::NotMigrating),
156        })
157    }
158
159    /// The current migration state and the epoch it was stamped at, or `None`.
160    /// For observability and the control plane (`docs/06` §5).
161    #[must_use]
162    pub fn state(&self, partition: &PartitionId) -> Option<(PartitionState, Epoch)> {
163        self.read_lock()
164            .get(partition)
165            .map(|e| (e.state.clone(), e.epoch))
166    }
167
168    /// Resolves the placement reads go to (and its epoch), or `None`. The single
169    /// read placement, `from` until a migration completes, so a read never
170    /// sees a split view (INV-M4). The routing entry point.
171    #[must_use]
172    pub fn get(&self, partition: &PartitionId) -> Option<PlacementAt> {
173        self.read_lock().get(partition).map(|e| {
174            PlacementAt::new(e.state.read_placement().clone(), e.epoch)
175                .with_phase(migration_phase(&e.state))
176        })
177    }
178
179    /// The migration write gate (`docs/06` §2): may a write resolved at `epoch`
180    /// for `partition` commit now? [`WriteAdmission::Admit`] only if writes are
181    /// currently allowed (not in the `Cutover` window) *and* the partition's
182    /// epoch is unchanged since the decision was resolved, otherwise
183    /// [`WriteAdmission::Reject`], which the caller surfaces as a retryable
184    /// stale-epoch error so the client re-resolves and retries.
185    ///
186    /// Epoch equality is the per-partition staleness check (`epoch` only advances
187    /// on *this* partition's transitions), and the cutover gate handles the one
188    /// window where a write resolved at the *current* epoch must still be held:
189    /// together they give INV-M1 (no write in cutover) and INV-M2 (no write
190    /// against a superseded placement after the flip).
191    #[must_use]
192    pub fn admit_write(&self, partition: &PartitionId, epoch: Epoch) -> WriteAdmission {
193        let admit = self
194            .read_lock()
195            .get(partition)
196            .is_some_and(|e| e.state.write_placement().is_some() && e.epoch == epoch);
197        if admit {
198            WriteAdmission::Admit
199        } else {
200            WriteAdmission::Reject
201        }
202    }
203
204    /// The current generation of the table (the epoch the most recent change
205    /// produced, or [`Epoch::ZERO`] if empty).
206    #[must_use]
207    pub fn current_epoch(&self) -> Epoch {
208        Epoch::new(self.generation.load(Ordering::SeqCst))
209    }
210
211    /// Allocates the next monotonic epoch (generation counter pre-increment).
212    fn next_epoch(&self) -> Epoch {
213        Epoch::new(self.generation.fetch_add(1, Ordering::SeqCst) + 1)
214    }
215
216    /// Applies a state transition under the write lock: `f` maps the current
217    /// state to the next one (or a [`MigrationError`]). On success the entry is
218    /// replaced and stamped with a fresh epoch; on any error the table is
219    /// untouched (transitions are atomic and side-effect-free on failure).
220    fn transition(
221        &self,
222        partition: &PartitionId,
223        f: impl FnOnce(PartitionState) -> Result<PartitionState, MigrationError>,
224    ) -> Result<Epoch, MigrationError> {
225        let mut entries = self.write_lock();
226        let current = entries
227            .get(partition)
228            .ok_or(MigrationError::UnknownPartition)?;
229        let next = f(current.state.clone())?;
230        let epoch = self.next_epoch();
231        entries.insert(partition.clone(), Entry::new(next, epoch));
232        Ok(epoch)
233    }
234
235    /// Acquires the read lock, recovering from a poisoned lock.
236    ///
237    /// A poisoned lock means a writer panicked mid-update. The stored data is a
238    /// plain map (no broken invariant a panic could leave torn), so recovering
239    /// the guard is safe and keeps routing available, far better than
240    /// propagating a panic onto every request path (NFR-R1).
241    fn read_lock(&self) -> std::sync::RwLockReadGuard<'_, HashMap<PartitionId, Entry>> {
242        self.entries
243            .read()
244            .unwrap_or_else(std::sync::PoisonError::into_inner)
245    }
246
247    /// Acquires the write lock, recovering from a poisoned lock (see
248    /// [`PlacementTable::read_lock`]).
249    fn write_lock(&self) -> std::sync::RwLockWriteGuard<'_, HashMap<PartitionId, Entry>> {
250        self.entries
251            .write()
252            .unwrap_or_else(std::sync::PoisonError::into_inner)
253    }
254}
255
256impl Entry {
257    fn new(state: PartitionState, epoch: Epoch) -> Self {
258        Self { state, epoch }
259    }
260}
261
262impl Default for PlacementTable {
263    fn default() -> Self {
264        Self::new()
265    }
266}
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271    use osproxy_core::{ClusterId, IndexName};
272
273    fn shared(cluster: &str, index: &str) -> Placement {
274        Placement::SharedIndex {
275            cluster: ClusterId::from(cluster),
276            index: IndexName::from(index),
277            inject: Vec::new(),
278        }
279    }
280
281    #[test]
282    fn missing_partition_resolves_to_none() {
283        let table = PlacementTable::new();
284        assert!(table.get(&PartitionId::from("absent")).is_none());
285        assert_eq!(table.current_epoch(), Epoch::ZERO);
286    }
287
288    #[test]
289    fn set_assigns_monotonic_epochs() {
290        let table = PlacementTable::new();
291        let e1 = table.set(PartitionId::from("a"), shared("c", "i"));
292        let e2 = table.set(PartitionId::from("b"), shared("c", "i"));
293        assert_eq!(e1, Epoch::new(1));
294        assert_eq!(e2, Epoch::new(2));
295        assert!(e2 > e1);
296        assert_eq!(table.current_epoch(), e2);
297    }
298
299    #[test]
300    fn migration_replaces_placement_and_advances_epoch() {
301        let table = PlacementTable::new();
302        let p = PartitionId::from("t");
303        table.set(p.clone(), shared("old", "i"));
304        let before = table.get(&p).unwrap();
305        assert_eq!(before.placement.cluster().as_str(), "old");
306
307        let migrated = table.set(p.clone(), shared("new", "i"));
308        let after = table.get(&p).unwrap();
309        assert_eq!(after.placement.cluster().as_str(), "new");
310        assert_eq!(after.epoch, migrated);
311        assert!(after.epoch > before.epoch);
312    }
313}