osproxy_control/migration.rs
1//! The fleet-safe migration control plane (`docs/06` §5).
2//!
3//! The proxy runs as **many instances**, each resolving placement and the write
4//! gate by polling the shared backend *fresh on every request*, nothing about a
5//! migration is cached in an instance, so the backend is the single synchronized
6//! source of truth (the in-memory [`PlacementTable`] here; a watched store such
7//! as etcd/Consul in M7, behind [`MigrationStore`]).
8//!
9//! That makes the routing flip safe *except* for one residual window: a write
10//! whose gate passed an instant **before** cutover was published may still be
11//! committing upstream. So the controller does not flip the pointer immediately:
12//! after publishing `Cutover` it holds a **drain barrier**, at least
13//! [`DEFAULT_DRAIN_BARRIER`] (≥ the upstream write timeout), before
14//! `complete_migration` is allowed. By then every pre-cutover write has either
15//! committed or hit its deadline, and no in-flight write can land in the old
16//! placement after the flip (INV-M1, INV-M2 fleet-wide).
17//!
18//! Time comes from an injected [`Clock`], so the barrier is deterministic in
19//! tests. One controller drives a given partition's migration (`docs/06` §5:
20//! operator/automation-driven, never AI-mutated).
21//!
22//! The [`MigrationStore`] seam is **async and fallible**: the in-process
23//! [`PlacementTable`] resolves transitions synchronously and never fails on the
24//! backend, but a distributed backend (etcd/Consul) does network I/O and may
25//! report it unreachable via [`MigrationError::Backend`]. The [`ControlPlane`]
26//! treats such a failure as a refused transition that leaves the partition
27//! unchanged, never a half-applied flip.
28
29use std::collections::HashMap;
30use std::sync::{Arc, Mutex};
31use std::time::Duration;
32
33use osproxy_core::{Clock, Epoch, Instant, PartitionId, SystemClock};
34use osproxy_spi::Placement;
35use osproxy_tenancy::{MigrationError, PartitionState, Phase, PlacementTable};
36use thiserror::Error;
37
38/// The default drain barrier: how long the controller holds after publishing
39/// `Cutover` before completing, so in-flight pre-cutover writes drain. Should be
40/// ≥ the sink's upstream write timeout (30s, NFR-R7); set higher for safety.
41pub const DEFAULT_DRAIN_BARRIER: Duration = Duration::from_secs(30);
42
43/// The backend that holds and transitions the fleet's placement state, the seam
44/// the proxy instances poll for reads and the controller drives for migration.
45///
46/// Implemented in-process by [`PlacementTable`] (and `Arc<PlacementTable>`); a
47/// distributed watched store (etcd/Consul/Redis/OS index) implements the same
48/// contract in M7 without changing the control protocol above it.
49#[allow(
50 async_fn_in_trait,
51 reason = "driven by a single operator/automation controller, not the request \
52 hot path; consumed through generics in ControlPlane (no dyn), where \
53 Send is verified at the await site, mirroring the SPI traits (docs/02 §2). \
54 A distributed backend (etcd/Consul) needs async + fallible I/O here."
55)]
56pub trait MigrationStore {
57 /// Begins migrating `partition` toward `to` (`Active` → `Draining`).
58 ///
59 /// # Errors
60 /// [`MigrationError`] if the partition is unknown or already migrating, or
61 /// [`MigrationError::Backend`] if a distributed backend is unreachable.
62 async fn begin_migration(
63 &self,
64 partition: &PartitionId,
65 to: Placement,
66 ) -> Result<Epoch, MigrationError>;
67
68 /// Moves an in-flight migration into the cutover window (`Draining` →
69 /// `Cutover`); writes are now rejected fleet-wide.
70 ///
71 /// # Errors
72 /// [`MigrationError`] if the partition is not draining (or a backend failure).
73 async fn enter_cutover(&self, partition: &PartitionId) -> Result<Epoch, MigrationError>;
74
75 /// Completes the migration, the pointer flip (`Cutover` → `Active(to)`).
76 ///
77 /// # Errors
78 /// [`MigrationError`] if the partition is not in cutover (or a backend failure).
79 async fn complete_migration(&self, partition: &PartitionId) -> Result<Epoch, MigrationError>;
80
81 /// Aborts an in-flight migration, returning it to its origin.
82 ///
83 /// # Errors
84 /// [`MigrationError`] if the partition is not migrating (or a backend failure).
85 async fn abort_migration(&self, partition: &PartitionId) -> Result<Epoch, MigrationError>;
86
87 /// The partition's current migration state and stamped epoch, or `None`.
88 async fn state(&self, partition: &PartitionId) -> Option<(PartitionState, Epoch)>;
89}
90
91impl MigrationStore for PlacementTable {
92 async fn begin_migration(
93 &self,
94 partition: &PartitionId,
95 to: Placement,
96 ) -> Result<Epoch, MigrationError> {
97 PlacementTable::begin_migration(self, partition, to)
98 }
99 async fn enter_cutover(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
100 PlacementTable::enter_cutover(self, partition)
101 }
102 async fn complete_migration(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
103 PlacementTable::complete_migration(self, partition)
104 }
105 async fn abort_migration(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
106 PlacementTable::abort_migration(self, partition)
107 }
108 async fn state(&self, partition: &PartitionId) -> Option<(PartitionState, Epoch)> {
109 PlacementTable::state(self, partition)
110 }
111}
112
113impl<T: MigrationStore + Sync + ?Sized> MigrationStore for Arc<T> {
114 async fn begin_migration(
115 &self,
116 partition: &PartitionId,
117 to: Placement,
118 ) -> Result<Epoch, MigrationError> {
119 (**self).begin_migration(partition, to).await
120 }
121 async fn enter_cutover(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
122 (**self).enter_cutover(partition).await
123 }
124 async fn complete_migration(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
125 (**self).complete_migration(partition).await
126 }
127 async fn abort_migration(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
128 (**self).abort_migration(partition).await
129 }
130 async fn state(&self, partition: &PartitionId) -> Option<(PartitionState, Epoch)> {
131 (**self).state(partition).await
132 }
133}
134
135/// Why a control-plane operation was refused.
136#[non_exhaustive]
137#[derive(Clone, PartialEq, Eq, Debug, Error)]
138pub enum ControlError {
139 /// The underlying state transition does not apply (wrong phase, unknown
140 /// partition, …).
141 #[error("transition refused: {0}")]
142 Transition(#[from] MigrationError),
143
144 /// `complete_migration` was called before the drain barrier elapsed; the
145 /// controller must wait `remaining` longer so in-flight pre-cutover writes
146 /// drain before the pointer flips.
147 #[error("drain barrier not elapsed; wait {remaining:?} longer")]
148 BarrierPending {
149 /// How much of the barrier is left.
150 remaining: Duration,
151 },
152}
153
154/// Drives a partition through its migration phases against a [`MigrationStore`],
155/// enforcing the drain barrier between cutover and completion (`docs/06` §5).
156pub struct ControlPlane<S> {
157 store: S,
158 clock: Arc<dyn Clock>,
159 barrier: Duration,
160 /// When each partition entered cutover, to time the drain barrier.
161 cutover_at: Mutex<HashMap<PartitionId, Instant>>,
162}
163
164impl<S: std::fmt::Debug> std::fmt::Debug for ControlPlane<S> {
165 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166 // The injected `Clock` is not `Debug`; the rest is the useful shape.
167 f.debug_struct("ControlPlane")
168 .field("store", &self.store)
169 .field("barrier", &self.barrier)
170 .field("cutover_at", &self.cutover_at)
171 .finish_non_exhaustive()
172 }
173}
174
175impl<S: MigrationStore> ControlPlane<S> {
176 /// Builds a controller over `store` with the default drain barrier and the
177 /// system clock.
178 #[must_use]
179 pub fn new(store: S) -> Self {
180 Self {
181 store,
182 clock: Arc::new(SystemClock),
183 barrier: DEFAULT_DRAIN_BARRIER,
184 cutover_at: Mutex::new(HashMap::new()),
185 }
186 }
187
188 /// Sets the drain barrier (builder style).
189 #[must_use]
190 pub fn with_barrier(mut self, barrier: Duration) -> Self {
191 self.barrier = barrier;
192 self
193 }
194
195 /// Swaps the clock the barrier reads (tests inject a `ManualClock`).
196 #[must_use]
197 pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
198 self.clock = clock;
199 self
200 }
201
202 /// Begins migrating `partition` toward `to`. Writes keep flowing to the
203 /// origin during the ensuing drain phase.
204 ///
205 /// # Errors
206 /// [`ControlError::Transition`] if the partition is unknown or migrating.
207 pub async fn begin_migration(
208 &self,
209 partition: &PartitionId,
210 to: Placement,
211 ) -> Result<Epoch, ControlError> {
212 Ok(self.store.begin_migration(partition, to).await?)
213 }
214
215 /// Enters the cutover window and starts the drain barrier clock. Writes are
216 /// now rejected fleet-wide (every instance polls this fresh).
217 ///
218 /// # Errors
219 /// [`ControlError::Transition`] if the partition is not draining.
220 pub async fn enter_cutover(&self, partition: &PartitionId) -> Result<Epoch, ControlError> {
221 let epoch = self.store.enter_cutover(partition).await?;
222 self.lock().insert(partition.clone(), self.clock.now());
223 Ok(epoch)
224 }
225
226 /// Completes the migration once the drain barrier has elapsed since cutover,
227 /// the pointer flip. Refused (without mutating the store) while in-flight
228 /// pre-cutover writes might still be committing.
229 ///
230 /// # Errors
231 /// [`ControlError::BarrierPending`] if the barrier has not elapsed;
232 /// [`ControlError::Transition`] if the partition is not in cutover.
233 pub async fn complete_migration(&self, partition: &PartitionId) -> Result<Epoch, ControlError> {
234 let now = self.clock.now();
235 let in_cutover = matches!(
236 self.store.state(partition).await,
237 Some((
238 PartitionState::Migrating {
239 phase: Phase::Cutover,
240 ..
241 },
242 _
243 ))
244 );
245 if in_cutover {
246 // Start the barrier now if this controller did not record cutover
247 // (errs toward waiting rather than flipping early).
248 let started = *self.lock().entry(partition.clone()).or_insert(now);
249 let elapsed = now.saturating_duration_since(started);
250 if elapsed < self.barrier {
251 return Err(ControlError::BarrierPending {
252 remaining: self.barrier.saturating_sub(elapsed),
253 });
254 }
255 }
256 let epoch = self.store.complete_migration(partition).await?;
257 self.lock().remove(partition);
258 Ok(epoch)
259 }
260
261 /// Aborts an in-flight migration, returning the partition to its origin and
262 /// clearing any pending barrier.
263 ///
264 /// # Errors
265 /// [`ControlError::Transition`] if the partition is not migrating.
266 pub async fn abort_migration(&self, partition: &PartitionId) -> Result<Epoch, ControlError> {
267 let epoch = self.store.abort_migration(partition).await?;
268 self.lock().remove(partition);
269 Ok(epoch)
270 }
271
272 /// The partition's current migration state and epoch, or `None`. For
273 /// operator/observability read-out (`docs/06` §5).
274 pub async fn state(&self, partition: &PartitionId) -> Option<(PartitionState, Epoch)> {
275 self.store.state(partition).await
276 }
277
278 /// Locks the cutover-time map, recovering a poisoned lock, it is plain
279 /// timing data with no invariant a panicking holder could tear (NFR-R1).
280 fn lock(&self) -> std::sync::MutexGuard<'_, HashMap<PartitionId, Instant>> {
281 self.cutover_at
282 .lock()
283 .unwrap_or_else(std::sync::PoisonError::into_inner)
284 }
285}