Skip to main content

osproxy_spi/
placement.rs

1//! Where a partition currently lives.
2//!
3//! A [`Placement`] is the resolved home of a partition; [`PlacementAt`] pairs it
4//! with the [`Epoch`] it was read at, so a write can be epoch-stamped and the
5//! sink can reject a stale-epoch write during a migration (`docs/03`, `docs/06`).
6
7use osproxy_core::{ClusterId, Epoch, IndexName};
8
9use crate::rules::InjectedField;
10
11/// The resolved home of a partition.
12///
13/// The three modes trade isolation against density (`docs/03` §3):
14/// - `DedicatedCluster`: the partition owns a whole cluster (its index name is
15///   carried unchanged from the request's logical index).
16/// - `DedicatedIndex`: the partition owns a physical index on a shared cluster.
17/// - `SharedIndex`: many partitions share one physical index; isolation is
18///   enforced by injected partition fields (whose names the SPI chose) plus a
19///   partition filter on read.
20///
21/// Deliberately *not* `#[non_exhaustive]`: the proxy core must interpret every
22/// placement mode to route correctly, so adding a mode should force every match
23/// in the workspace to be updated rather than silently fall through (`docs/03`).
24///
25/// # Examples
26///
27/// ```
28/// use osproxy_spi::Placement;
29/// use osproxy_spi::core::{ClusterId, IndexName};
30///
31/// let p = Placement::SharedIndex {
32///     cluster: ClusterId::from("eu-1"),
33///     index: IndexName::from("shared"),
34///     inject: vec![],
35/// };
36/// assert_eq!(p.cluster().as_str(), "eu-1");
37/// ```
38#[derive(Clone, PartialEq, Eq, Debug)]
39pub enum Placement {
40    /// The partition has a dedicated cluster.
41    DedicatedCluster {
42        /// The cluster that exclusively serves this partition.
43        cluster: ClusterId,
44    },
45    /// The partition has a dedicated index on a shared cluster.
46    DedicatedIndex {
47        /// The hosting cluster.
48        cluster: ClusterId,
49        /// The physical index for this partition.
50        index: IndexName,
51    },
52    /// The partition shares a physical index with others, isolated by the
53    /// injected fields named here.
54    SharedIndex {
55        /// The hosting cluster.
56        cluster: ClusterId,
57        /// The shared physical index.
58        index: IndexName,
59        /// Fields injected on ingest and stripped on read to isolate tenants.
60        inject: Vec<InjectedField>,
61    },
62}
63
64impl Placement {
65    /// The cluster this placement resolves to, regardless of mode.
66    #[must_use]
67    pub fn cluster(&self) -> &ClusterId {
68        match self {
69            Self::DedicatedCluster { cluster }
70            | Self::DedicatedIndex { cluster, .. }
71            | Self::SharedIndex { cluster, .. } => cluster,
72        }
73    }
74}
75
76/// The partition's migration phase at read time, a shape-only label (never
77/// tenant data) so observability can show where a migration is (`docs/06` §5).
78///
79/// # Examples
80///
81/// ```
82/// use osproxy_spi::MigrationPhase;
83/// assert_eq!(MigrationPhase::default(), MigrationPhase::Settled);
84/// assert_eq!(MigrationPhase::Cutover.as_str(), "cutover");
85/// ```
86#[derive(Clone, Copy, PartialEq, Eq, Debug, Default)]
87pub enum MigrationPhase {
88    /// Not migrating; the placement is settled.
89    #[default]
90    Settled,
91    /// Migrating, copy phase, writes still go to the origin.
92    Draining,
93    /// Migrating, cutover window, writes are held (stale-epoch retry).
94    Cutover,
95}
96
97impl MigrationPhase {
98    /// A stable lowercase label for telemetry.
99    #[must_use]
100    pub fn as_str(self) -> &'static str {
101        match self {
102            Self::Settled => "settled",
103            Self::Draining => "draining",
104            Self::Cutover => "cutover",
105        }
106    }
107}
108
109/// A [`Placement`] together with the placement-table epoch it was read at and the
110/// partition's migration phase.
111///
112/// The epoch flows into the routing decision and onto the write so migration
113/// cutover can detect a write resolved against a superseded placement
114/// (`docs/06` §2); the phase is shape-only context for observability.
115///
116/// # Examples
117///
118/// ```
119/// use osproxy_spi::{Placement, PlacementAt, MigrationPhase};
120/// use osproxy_spi::core::{ClusterId, Epoch};
121///
122/// let at = PlacementAt::new(
123///     Placement::DedicatedCluster { cluster: ClusterId::from("eu-1") },
124///     Epoch::new(7),
125/// )
126/// .with_phase(MigrationPhase::Draining);
127/// assert_eq!(at.epoch, Epoch::new(7));
128/// assert_eq!(at.phase, MigrationPhase::Draining);
129/// ```
130#[derive(Clone, PartialEq, Eq, Debug)]
131pub struct PlacementAt {
132    /// The resolved placement.
133    pub placement: Placement,
134    /// The epoch the placement table was at when this was read.
135    pub epoch: Epoch,
136    /// The partition's migration phase at read time.
137    pub phase: MigrationPhase,
138    /// The base URL of the placement's cluster. The tenancy is the source of
139    /// truth for where each cluster lives; the sink builds a pool for this URL
140    /// the first time it routes to the cluster. Required to reach a live cluster
141    /// (an in-memory sink ignores it).
142    pub endpoint: Option<String>,
143}
144
145impl PlacementAt {
146    /// Pairs a placement with the epoch it was read at (settled, not migrating,
147    /// no endpoint).
148    #[must_use]
149    pub fn new(placement: Placement, epoch: Epoch) -> Self {
150        Self {
151            placement,
152            epoch,
153            phase: MigrationPhase::Settled,
154            endpoint: None,
155        }
156    }
157
158    /// Sets the migration phase (builder style).
159    #[must_use]
160    pub fn with_phase(mut self, phase: MigrationPhase) -> Self {
161        self.phase = phase;
162        self
163    }
164
165    /// Sets the cluster's base URL (builder style). This is how the tenancy tells
166    /// the proxy where the placement's cluster lives, e.g.
167    /// `.with_endpoint("https://eu-1.internal:9200")`.
168    #[must_use]
169    pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
170        self.endpoint = Some(endpoint.into());
171        self
172    }
173}
174
175#[cfg(test)]
176mod tests {
177    use super::*;
178
179    #[test]
180    fn cluster_is_extracted_for_every_mode() {
181        let dc = Placement::DedicatedCluster {
182            cluster: ClusterId::from("c1"),
183        };
184        let di = Placement::DedicatedIndex {
185            cluster: ClusterId::from("c2"),
186            index: IndexName::from("i"),
187        };
188        let si = Placement::SharedIndex {
189            cluster: ClusterId::from("c3"),
190            index: IndexName::from("shared"),
191            inject: Vec::new(),
192        };
193        assert_eq!(dc.cluster().as_str(), "c1");
194        assert_eq!(di.cluster().as_str(), "c2");
195        assert_eq!(si.cluster().as_str(), "c3");
196    }
197
198    #[test]
199    fn placement_at_pairs_epoch() {
200        let at = PlacementAt::new(
201            Placement::DedicatedCluster {
202                cluster: ClusterId::from("c"),
203            },
204            Epoch::new(5),
205        );
206        assert_eq!(at.epoch, Epoch::new(5));
207    }
208}