Skip to main content

osproxy_control/
migration.rs

1//! The fleet-safe migration control plane (`docs/06` §5).
2//!
3//! The proxy runs as **many instances**, each resolving placement and the write
4//! gate by polling the shared backend *fresh on every request*, nothing about a
5//! migration is cached in an instance, so the backend is the single synchronized
6//! source of truth (the in-memory [`PlacementTable`] here; a watched store such
7//! as etcd/Consul in M7, behind [`MigrationStore`]).
8//!
9//! That makes the routing flip safe *except* for one residual window: a write
10//! whose gate passed an instant **before** cutover was published may still be
11//! committing upstream. So the controller does not flip the pointer immediately:
12//! after publishing `Cutover` it holds a **drain barrier**, at least
13//! [`DEFAULT_DRAIN_BARRIER`] (≥ the upstream write timeout), before
14//! `complete_migration` is allowed. By then every pre-cutover write has either
15//! committed or hit its deadline, and no in-flight write can land in the old
16//! placement after the flip (INV-M1, INV-M2 fleet-wide).
17//!
18//! Time comes from an injected [`Clock`], so the barrier is deterministic in
19//! tests. One controller drives a given partition's migration (`docs/06` §5:
20//! operator/automation-driven, never AI-mutated).
21//!
22//! The [`MigrationStore`] seam is **async and fallible**: the in-process
23//! [`PlacementTable`] resolves transitions synchronously and never fails on the
24//! backend, but a distributed backend (etcd/Consul) does network I/O and may
25//! report it unreachable via [`MigrationError::Backend`]. The [`ControlPlane`]
26//! treats such a failure as a refused transition that leaves the partition
27//! unchanged, never a half-applied flip.
28
29use std::collections::HashMap;
30use std::sync::{Arc, Mutex};
31use std::time::Duration;
32
33use osproxy_core::{Clock, Epoch, Instant, PartitionId, SystemClock};
34use osproxy_spi::Placement;
35use osproxy_tenancy::{MigrationError, PartitionState, Phase, PlacementTable};
36use thiserror::Error;
37
38/// The default drain barrier: how long the controller holds after publishing
39/// `Cutover` before completing, so in-flight pre-cutover writes drain. Should be
40/// ≥ the sink's upstream write timeout (30s, NFR-R7); set higher for safety.
41pub const DEFAULT_DRAIN_BARRIER: Duration = Duration::from_secs(30);
42
43/// The backend that holds and transitions the fleet's placement state, the seam
44/// the proxy instances poll for reads and the controller drives for migration.
45///
46/// Implemented in-process by [`PlacementTable`] (and `Arc<PlacementTable>`); a
47/// distributed watched store (etcd/Consul/Redis/OS index) implements the same
48/// contract in M7 without changing the control protocol above it.
49#[allow(
50    async_fn_in_trait,
51    reason = "driven by a single operator/automation controller, not the request \
52              hot path; consumed through generics in ControlPlane (no dyn), where \
53              Send is verified at the await site, mirroring the SPI traits (docs/02 §2). \
54              A distributed backend (etcd/Consul) needs async + fallible I/O here."
55)]
56pub trait MigrationStore {
57    /// Begins migrating `partition` toward `to` (`Active` → `Draining`).
58    ///
59    /// # Errors
60    /// [`MigrationError`] if the partition is unknown or already migrating, or
61    /// [`MigrationError::Backend`] if a distributed backend is unreachable.
62    async fn begin_migration(
63        &self,
64        partition: &PartitionId,
65        to: Placement,
66    ) -> Result<Epoch, MigrationError>;
67
68    /// Moves an in-flight migration into the cutover window (`Draining` →
69    /// `Cutover`); writes are now rejected fleet-wide.
70    ///
71    /// # Errors
72    /// [`MigrationError`] if the partition is not draining (or a backend failure).
73    async fn enter_cutover(&self, partition: &PartitionId) -> Result<Epoch, MigrationError>;
74
75    /// Completes the migration, the pointer flip (`Cutover` → `Active(to)`).
76    ///
77    /// # Errors
78    /// [`MigrationError`] if the partition is not in cutover (or a backend failure).
79    async fn complete_migration(&self, partition: &PartitionId) -> Result<Epoch, MigrationError>;
80
81    /// Aborts an in-flight migration, returning it to its origin.
82    ///
83    /// # Errors
84    /// [`MigrationError`] if the partition is not migrating (or a backend failure).
85    async fn abort_migration(&self, partition: &PartitionId) -> Result<Epoch, MigrationError>;
86
87    /// The partition's current migration state and stamped epoch, or `None`.
88    async fn state(&self, partition: &PartitionId) -> Option<(PartitionState, Epoch)>;
89}
90
91impl MigrationStore for PlacementTable {
92    async fn begin_migration(
93        &self,
94        partition: &PartitionId,
95        to: Placement,
96    ) -> Result<Epoch, MigrationError> {
97        PlacementTable::begin_migration(self, partition, to)
98    }
99    async fn enter_cutover(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
100        PlacementTable::enter_cutover(self, partition)
101    }
102    async fn complete_migration(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
103        PlacementTable::complete_migration(self, partition)
104    }
105    async fn abort_migration(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
106        PlacementTable::abort_migration(self, partition)
107    }
108    async fn state(&self, partition: &PartitionId) -> Option<(PartitionState, Epoch)> {
109        PlacementTable::state(self, partition)
110    }
111}
112
113impl<T: MigrationStore + Sync + ?Sized> MigrationStore for Arc<T> {
114    async fn begin_migration(
115        &self,
116        partition: &PartitionId,
117        to: Placement,
118    ) -> Result<Epoch, MigrationError> {
119        (**self).begin_migration(partition, to).await
120    }
121    async fn enter_cutover(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
122        (**self).enter_cutover(partition).await
123    }
124    async fn complete_migration(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
125        (**self).complete_migration(partition).await
126    }
127    async fn abort_migration(&self, partition: &PartitionId) -> Result<Epoch, MigrationError> {
128        (**self).abort_migration(partition).await
129    }
130    async fn state(&self, partition: &PartitionId) -> Option<(PartitionState, Epoch)> {
131        (**self).state(partition).await
132    }
133}
134
135/// Why a control-plane operation was refused.
136#[non_exhaustive]
137#[derive(Clone, PartialEq, Eq, Debug, Error)]
138pub enum ControlError {
139    /// The underlying state transition does not apply (wrong phase, unknown
140    /// partition, …).
141    #[error("transition refused: {0}")]
142    Transition(#[from] MigrationError),
143
144    /// `complete_migration` was called before the drain barrier elapsed; the
145    /// controller must wait `remaining` longer so in-flight pre-cutover writes
146    /// drain before the pointer flips.
147    #[error("drain barrier not elapsed; wait {remaining:?} longer")]
148    BarrierPending {
149        /// How much of the barrier is left.
150        remaining: Duration,
151    },
152}
153
154/// Drives a partition through its migration phases against a [`MigrationStore`],
155/// enforcing the drain barrier between cutover and completion (`docs/06` §5).
156pub struct ControlPlane<S> {
157    store: S,
158    clock: Arc<dyn Clock>,
159    barrier: Duration,
160    /// When each partition entered cutover, to time the drain barrier.
161    cutover_at: Mutex<HashMap<PartitionId, Instant>>,
162}
163
164impl<S: std::fmt::Debug> std::fmt::Debug for ControlPlane<S> {
165    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166        // The injected `Clock` is not `Debug`; the rest is the useful shape.
167        f.debug_struct("ControlPlane")
168            .field("store", &self.store)
169            .field("barrier", &self.barrier)
170            .field("cutover_at", &self.cutover_at)
171            .finish_non_exhaustive()
172    }
173}
174
175impl<S: MigrationStore> ControlPlane<S> {
176    /// Builds a controller over `store` with the default drain barrier and the
177    /// system clock.
178    #[must_use]
179    pub fn new(store: S) -> Self {
180        Self {
181            store,
182            clock: Arc::new(SystemClock),
183            barrier: DEFAULT_DRAIN_BARRIER,
184            cutover_at: Mutex::new(HashMap::new()),
185        }
186    }
187
188    /// Sets the drain barrier (builder style).
189    #[must_use]
190    pub fn with_barrier(mut self, barrier: Duration) -> Self {
191        self.barrier = barrier;
192        self
193    }
194
195    /// Swaps the clock the barrier reads (tests inject a `ManualClock`).
196    #[must_use]
197    pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
198        self.clock = clock;
199        self
200    }
201
202    /// Begins migrating `partition` toward `to`. Writes keep flowing to the
203    /// origin during the ensuing drain phase.
204    ///
205    /// # Errors
206    /// [`ControlError::Transition`] if the partition is unknown or migrating.
207    pub async fn begin_migration(
208        &self,
209        partition: &PartitionId,
210        to: Placement,
211    ) -> Result<Epoch, ControlError> {
212        Ok(self.store.begin_migration(partition, to).await?)
213    }
214
215    /// Enters the cutover window and starts the drain barrier clock. Writes are
216    /// now rejected fleet-wide (every instance polls this fresh).
217    ///
218    /// # Errors
219    /// [`ControlError::Transition`] if the partition is not draining.
220    pub async fn enter_cutover(&self, partition: &PartitionId) -> Result<Epoch, ControlError> {
221        let epoch = self.store.enter_cutover(partition).await?;
222        self.lock().insert(partition.clone(), self.clock.now());
223        Ok(epoch)
224    }
225
226    /// Completes the migration once the drain barrier has elapsed since cutover,
227    /// the pointer flip. Refused (without mutating the store) while in-flight
228    /// pre-cutover writes might still be committing.
229    ///
230    /// # Errors
231    /// [`ControlError::BarrierPending`] if the barrier has not elapsed;
232    /// [`ControlError::Transition`] if the partition is not in cutover.
233    pub async fn complete_migration(&self, partition: &PartitionId) -> Result<Epoch, ControlError> {
234        let now = self.clock.now();
235        let in_cutover = matches!(
236            self.store.state(partition).await,
237            Some((
238                PartitionState::Migrating {
239                    phase: Phase::Cutover,
240                    ..
241                },
242                _
243            ))
244        );
245        if in_cutover {
246            // Start the barrier now if this controller did not record cutover
247            // (errs toward waiting rather than flipping early).
248            let started = *self.lock().entry(partition.clone()).or_insert(now);
249            let elapsed = now.saturating_duration_since(started);
250            if elapsed < self.barrier {
251                return Err(ControlError::BarrierPending {
252                    remaining: self.barrier.saturating_sub(elapsed),
253                });
254            }
255        }
256        let epoch = self.store.complete_migration(partition).await?;
257        self.lock().remove(partition);
258        Ok(epoch)
259    }
260
261    /// Aborts an in-flight migration, returning the partition to its origin and
262    /// clearing any pending barrier.
263    ///
264    /// # Errors
265    /// [`ControlError::Transition`] if the partition is not migrating.
266    pub async fn abort_migration(&self, partition: &PartitionId) -> Result<Epoch, ControlError> {
267        let epoch = self.store.abort_migration(partition).await?;
268        self.lock().remove(partition);
269        Ok(epoch)
270    }
271
272    /// The partition's current migration state and epoch, or `None`. For
273    /// operator/observability read-out (`docs/06` §5).
274    pub async fn state(&self, partition: &PartitionId) -> Option<(PartitionState, Epoch)> {
275        self.store.state(partition).await
276    }
277
278    /// Locks the cutover-time map, recovering a poisoned lock, it is plain
279    /// timing data with no invariant a panicking holder could tear (NFR-R1).
280    fn lock(&self) -> std::sync::MutexGuard<'_, HashMap<PartitionId, Instant>> {
281        self.cutover_at
282            .lock()
283            .unwrap_or_else(std::sync::PoisonError::into_inner)
284    }
285}