Skip to main content

osproxy_tenancy/
router.rs

1//! Adapts a high-level [`TenancySpi`] into the low-level [`RoutingSpi`].
2//!
3//! This is where declarative tenancy rules become a concrete routing decision:
4//! resolve the partition, look up its placement, derive the physical target,
5//! and assemble the body transform (with injected-field values already resolved
6//! to constants, so downstream stages stay pure). The `SharedIndex`
7//! partition-in-id invariant (`docs/03`) is enforced here.
8
9use std::future::Future;
10
11use osproxy_core::{ClusterId, Epoch, IndexName, PartitionId, Target};
12use osproxy_spi::{
13    BodyDoc, BodyTransform, InjectedField, InjectedValue, MigrationPhase, Placement, RequestCtx,
14    RouteDecision, RoutingSpi, SpiError, TenancySpi,
15};
16use serde_json::Value;
17
18/// A fully resolved routing decision plus the partition it was resolved for.
19///
20/// The engine consumes this richer result directly (it needs the partition to
21/// construct the document `_id` and `_routing`); the [`RoutingSpi`] impl exposes
22/// just the [`RouteDecision`].
23#[derive(Clone, PartialEq, Eq, Debug)]
24pub struct Resolved {
25    /// The resolved partition id.
26    pub partition: PartitionId,
27    /// The routing decision derived from the partition's placement.
28    pub decision: RouteDecision,
29    /// The partition's migration phase at resolve time (shape-only, for
30    /// observability, `docs/06` §5).
31    pub migration: MigrationPhase,
32}
33
34/// Turns a [`TenancySpi`] implementation into a [`RoutingSpi`].
35#[derive(Debug)]
36pub struct TenancyRouter<T> {
37    spi: T,
38}
39
40impl<T: TenancySpi> TenancyRouter<T> {
41    /// Wraps a tenancy implementation.
42    #[must_use]
43    pub fn new(spi: T) -> Self {
44        Self { spi }
45    }
46
47    /// The wrapped tenancy implementation.
48    #[must_use]
49    pub fn spi(&self) -> &T {
50        &self.spi
51    }
52
53    /// The migration write gate for a resolved decision (`docs/06` §2): whether a
54    /// write that resolved at `epoch` for `partition` may still commit. The
55    /// engine calls this at dispatch; `false` is surfaced as a retryable
56    /// stale-epoch error. Delegates to the [`TenancySpi`].
57    pub async fn admit_write(&self, partition: &PartitionId, epoch: Epoch) -> bool {
58        self.spi.admit_write(partition, epoch).await
59    }
60
61    /// Resolves the full routing plan for `ctx` (the single-document path).
62    ///
63    /// # Errors
64    ///
65    /// Returns [`SpiError`] if the endpoint is not tenancy-aware, the partition
66    /// cannot be resolved, no placement exists, or the configured transforms are
67    /// invalid (e.g. a shared-index id rule that omits the partition).
68    pub async fn resolve(&self, ctx: &RequestCtx<'_>) -> Result<Resolved, SpiError> {
69        if !ctx.endpoint().is_tenancy_aware() {
70            return Err(SpiError::UnsupportedEndpoint {
71                endpoint: ctx.endpoint(),
72            });
73        }
74        // The body is scanned on demand for the partition key, never parsed into
75        // a JSON tree (ADR-014).
76        let partition = self.resolve_partition(ctx, BodyDoc::new(ctx.body()))?;
77        self.resolve_placement(ctx, partition, ctx.logical_index())
78            .await
79    }
80
81    /// Resolves just the partition id for a request and document, without a
82    /// placement lookup. The per-document entry point for bulk demux (`docs/04`
83    /// §3), where each operation carries its own source as a [`BodyDoc`].
84    ///
85    /// # Errors
86    ///
87    /// Returns [`SpiError::PartitionUnresolved`] if no configured source yields
88    /// the partition.
89    pub fn resolve_partition(
90        &self,
91        ctx: &RequestCtx<'_>,
92        body: BodyDoc<'_>,
93    ) -> Result<PartitionId, SpiError> {
94        self.spi.resolve_partition(ctx, body)
95    }
96
97    /// Resolves a known partition to its placement and the routing plan for a
98    /// given logical index. Separated from [`Self::resolve_partition`] so a bulk
99    /// request can resolve the partition per document but cache the placement
100    /// per partition.
101    ///
102    /// # Errors
103    ///
104    /// Returns [`SpiError`] if no placement exists or the configured transforms
105    /// are invalid (e.g. a shared-index id rule that omits the partition).
106    pub async fn resolve_placement(
107        &self,
108        ctx: &RequestCtx<'_>,
109        partition: PartitionId,
110        logical_index: &str,
111    ) -> Result<Resolved, SpiError> {
112        let at = self.spi.placement_for(&partition).await?;
113        // Carry the cluster's endpoint (from the placement result) onto the
114        // target so the sink can pool it, the tenancy is the source of truth for
115        // where each cluster lives.
116        let target = target_for(&at.placement, logical_index).with_endpoint(at.endpoint.clone());
117        let body_transform = self.build_transform(&at.placement, &partition, ctx)?;
118        let decision = RouteDecision {
119            target,
120            upstream_protocol: ctx.protocol(),
121            header_ops: Vec::new(),
122            body_transform,
123            epoch: at.epoch,
124        };
125        Ok(Resolved {
126            partition,
127            decision,
128            migration: at.phase,
129        })
130    }
131
132    /// Builds the body transform for a placement, resolving injected-field
133    /// values and enforcing the shared-index partition-in-id invariant.
134    fn build_transform(
135        &self,
136        placement: &Placement,
137        partition: &PartitionId,
138        ctx: &RequestCtx<'_>,
139    ) -> Result<BodyTransform, SpiError> {
140        let inject = match placement {
141            Placement::SharedIndex { inject, .. } => resolve_inject(inject, partition, ctx)?,
142            Placement::DedicatedCluster { .. } | Placement::DedicatedIndex { .. } => Vec::new(),
143        };
144
145        let id_rule = self.spi.doc_id_rule();
146        // In SharedIndex mode the partition id is MANDATORY in the doc-id template
147        // (docs/03 §4): by-id reads/writes (`_doc/{id}`) bypass the query filter and
148        // hit the physical id directly, so without a partition-scoped id two tenants
149        // collide on the same `_id`, a cross-tenant overwrite on write and a
150        // cross-tenant read on get. A *missing* rule is as unsafe as a partition-free
151        // one, so reject both here rather than only validating a rule that happens to
152        // be present.
153        if let Placement::SharedIndex { .. } = placement {
154            let partition_scoped = id_rule
155                .as_ref()
156                .is_some_and(|rule| rule.template.references_partition());
157            if !partition_scoped {
158                return Err(SpiError::IdRuleMissingPartition);
159            }
160        }
161
162        Ok(match (inject.is_empty(), id_rule) {
163            (true, None) => BodyTransform::None,
164            (false, None) => BodyTransform::Inject(inject),
165            (true, Some(id)) => BodyTransform::ConstructId(id),
166            (false, Some(id)) => BodyTransform::Both { inject, id },
167        })
168    }
169}
170
171impl<T: TenancySpi> RoutingSpi for TenancyRouter<T> {
172    async fn route(&self, ctx: &RequestCtx<'_>) -> Result<RouteDecision, SpiError> {
173        Ok(self.resolve(ctx).await?.decision)
174    }
175}
176
177/// The partition-aware routing seam the engine pipeline drives.
178///
179/// [`RoutingSpi`] yields only a [`RouteDecision`]; the engine needs more, the
180/// resolved partition (to construct `_id`/`_routing` and to demux bulk per
181/// document), the epoch and migration phase (the write gate), and a split
182/// resolve so a bulk request can resolve the partition per document but cache the
183/// placement per partition. This trait captures exactly that contract, so the
184/// pipeline is generic over *any* router that can provide it, not nailed to the
185/// concrete [`TenancyRouter`]. [`TenancyRouter`] is the in-tree implementation.
186pub trait Router: Send + Sync + 'static {
187    /// Resolves the full routing plan for a single-document request.
188    ///
189    /// # Errors
190    ///
191    /// Returns [`SpiError`] if the endpoint is not tenancy-aware, the partition
192    /// cannot be resolved, no placement exists, or the transforms are invalid.
193    fn resolve(
194        &self,
195        ctx: &RequestCtx<'_>,
196    ) -> impl Future<Output = Result<Resolved, SpiError>> + Send;
197
198    /// Resolves just the partition id for a request and document (the bulk demux
199    /// entry point).
200    ///
201    /// # Errors
202    ///
203    /// Returns [`SpiError::PartitionUnresolved`] if no source yields a partition.
204    fn resolve_partition(
205        &self,
206        ctx: &RequestCtx<'_>,
207        body: BodyDoc<'_>,
208    ) -> Result<PartitionId, SpiError>;
209
210    /// Resolves a known partition to its placement and routing plan.
211    ///
212    /// # Errors
213    ///
214    /// Returns [`SpiError`] if no placement exists or the transforms are invalid.
215    fn resolve_placement(
216        &self,
217        ctx: &RequestCtx<'_>,
218        partition: PartitionId,
219        logical_index: &str,
220    ) -> impl Future<Output = Result<Resolved, SpiError>> + Send;
221
222    /// The migration write gate: may a write that resolved at `epoch` for
223    /// `partition` still commit? `false` ⇒ reject as a retryable stale-epoch error.
224    fn admit_write(
225        &self,
226        partition: &PartitionId,
227        epoch: Epoch,
228    ) -> impl Future<Output = bool> + Send;
229
230    /// The base URL of a cluster by id, for the cursor-affinity and admin paths
231    /// that route by cluster without a placement. Default `None`.
232    fn cluster_endpoint(&self, _cluster: &ClusterId) -> Option<String> {
233        None
234    }
235}
236
237impl<T: TenancySpi> Router for TenancyRouter<T> {
238    async fn resolve(&self, ctx: &RequestCtx<'_>) -> Result<Resolved, SpiError> {
239        TenancyRouter::resolve(self, ctx).await
240    }
241
242    fn resolve_partition(
243        &self,
244        ctx: &RequestCtx<'_>,
245        body: BodyDoc<'_>,
246    ) -> Result<PartitionId, SpiError> {
247        TenancyRouter::resolve_partition(self, ctx, body)
248    }
249
250    async fn resolve_placement(
251        &self,
252        ctx: &RequestCtx<'_>,
253        partition: PartitionId,
254        logical_index: &str,
255    ) -> Result<Resolved, SpiError> {
256        TenancyRouter::resolve_placement(self, ctx, partition, logical_index).await
257    }
258
259    async fn admit_write(&self, partition: &PartitionId, epoch: Epoch) -> bool {
260        TenancyRouter::admit_write(self, partition, epoch).await
261    }
262
263    fn cluster_endpoint(&self, cluster: &ClusterId) -> Option<String> {
264        self.spi.cluster_endpoint(cluster)
265    }
266}
267
268/// Derives the physical [`Target`] from a placement and the request's logical
269/// index. A dedicated cluster carries the logical index name unchanged; the
270/// other modes pin a concrete physical index.
271fn target_for(placement: &Placement, logical_index: &str) -> Target {
272    match placement {
273        Placement::DedicatedCluster { cluster } => {
274            Target::new(cluster.clone(), IndexName::from(logical_index))
275        }
276        Placement::DedicatedIndex { cluster, index }
277        | Placement::SharedIndex { cluster, index, .. } => {
278            Target::new(cluster.clone(), index.clone())
279        }
280    }
281}
282
283/// Resolves the *context-derived* injected values to constants, using the
284/// request. The `PartitionId` value is left as-is: it is the read-isolation key,
285/// and downstream stages resolve it to the partition, so the read path can tell
286/// the isolation field apart from the decorative (context-derived) ones.
287fn resolve_inject(
288    fields: &[InjectedField],
289    _partition: &PartitionId,
290    ctx: &RequestCtx<'_>,
291) -> Result<Vec<InjectedField>, SpiError> {
292    fields
293        .iter()
294        .map(|field| {
295            let value = match &field.value {
296                // The isolation field stays symbolic; never filtered on a
297                // context-derived value (which would differ on read).
298                InjectedValue::PartitionId => return Ok(field.clone()),
299                InjectedValue::Constant(constant) => constant.clone(),
300                InjectedValue::FromPrincipal(attr) => ctx
301                    .principal()
302                    .attr(attr)
303                    .map(|v| Value::String(v.to_owned()))
304                    .ok_or_else(|| SpiError::PrincipalAttrMissing { attr: attr.clone() })?,
305                InjectedValue::FromHeader(name) => ctx
306                    .headers()
307                    .get(name)
308                    .map(|v| Value::String(v.to_owned()))
309                    .ok_or_else(|| SpiError::HeaderMissing {
310                        header: name.clone(),
311                    })?,
312            };
313            Ok(InjectedField::new(
314                field.name.clone(),
315                InjectedValue::Constant(value),
316            ))
317        })
318        .collect()
319}
320
321#[cfg(test)]
322#[path = "router_tests.rs"]
323mod tests;