Skip to main content

osproxy_tenancy/
router.rs

1//! Adapts a high-level [`TenancySpi`] into the low-level [`RoutingSpi`].
2//!
3//! This is where declarative tenancy rules become a concrete routing decision:
4//! resolve the partition, look up its placement, derive the physical target,
5//! and assemble the body transform (with injected-field values already resolved
6//! to constants, so downstream stages stay pure). The `SharedIndex`
7//! partition-in-id invariant (`docs/03`) is enforced here.
8
9use osproxy_core::{ClusterId, Epoch, IndexName, PartitionId, Target};
10use osproxy_spi::{
11    BodyDoc, BodyTransform, InjectedField, InjectedValue, MigrationPhase, Placement, RequestCtx,
12    RouteDecision, RoutingSpi, SpiError, TenancySpi,
13};
14use serde_json::Value;
15
16/// A fully resolved routing decision plus the partition it was resolved for.
17///
18/// The engine consumes this richer result directly (it needs the partition to
19/// construct the document `_id` and `_routing`); the [`RoutingSpi`] impl exposes
20/// just the [`RouteDecision`].
21#[derive(Clone, PartialEq, Eq, Debug)]
22pub struct Resolved {
23    /// The resolved partition id.
24    pub partition: PartitionId,
25    /// The routing decision derived from the partition's placement.
26    pub decision: RouteDecision,
27    /// The partition's migration phase at resolve time (shape-only, for
28    /// observability, `docs/06` §5).
29    pub migration: MigrationPhase,
30}
31
32/// Turns a [`TenancySpi`] implementation into a [`RoutingSpi`].
33#[derive(Debug)]
34pub struct TenancyRouter<T> {
35    spi: T,
36}
37
38impl<T: TenancySpi> TenancyRouter<T> {
39    /// Wraps a tenancy implementation.
40    #[must_use]
41    pub fn new(spi: T) -> Self {
42        Self { spi }
43    }
44
45    /// The wrapped tenancy implementation.
46    #[must_use]
47    pub fn spi(&self) -> &T {
48        &self.spi
49    }
50
51    /// The migration write gate for a resolved decision (`docs/06` §2): whether a
52    /// write that resolved at `epoch` for `partition` may still commit. The
53    /// engine calls this at dispatch; `false` is surfaced as a retryable
54    /// stale-epoch error. Delegates to the [`TenancySpi`].
55    pub async fn admit_write(&self, partition: &PartitionId, epoch: Epoch) -> bool {
56        self.spi.admit_write(partition, epoch).await
57    }
58
59    /// Resolves the full routing plan for `ctx` (the single-document path).
60    ///
61    /// # Errors
62    ///
63    /// Returns [`SpiError`] if the endpoint is not tenancy-aware, the partition
64    /// cannot be resolved, no placement exists, or the configured transforms are
65    /// invalid (e.g. a shared-index id rule that omits the partition).
66    pub async fn resolve(&self, ctx: &RequestCtx<'_>) -> Result<Resolved, SpiError> {
67        if !ctx.endpoint().is_tenancy_aware() {
68            return Err(SpiError::UnsupportedEndpoint {
69                endpoint: ctx.endpoint(),
70            });
71        }
72        // The body is scanned on demand for the partition key, never parsed into
73        // a JSON tree (ADR-014).
74        let partition = self.resolve_partition(ctx, BodyDoc::new(ctx.body()))?;
75        self.resolve_placement(ctx, partition, ctx.logical_index())
76            .await
77    }
78
79    /// Resolves just the partition id for a request and document, without a
80    /// placement lookup. The per-document entry point for bulk demux (`docs/04`
81    /// §3), where each operation carries its own source as a [`BodyDoc`].
82    ///
83    /// # Errors
84    ///
85    /// Returns [`SpiError::PartitionUnresolved`] if no configured source yields
86    /// the partition.
87    pub fn resolve_partition(
88        &self,
89        ctx: &RequestCtx<'_>,
90        body: BodyDoc<'_>,
91    ) -> Result<PartitionId, SpiError> {
92        self.spi.resolve_partition(ctx, body)
93    }
94
95    /// Resolves a known partition to its placement and the routing plan for a
96    /// given logical index. Separated from [`Self::resolve_partition`] so a bulk
97    /// request can resolve the partition per document but cache the placement
98    /// per partition.
99    ///
100    /// # Errors
101    ///
102    /// Returns [`SpiError`] if no placement exists or the configured transforms
103    /// are invalid (e.g. a shared-index id rule that omits the partition).
104    pub async fn resolve_placement(
105        &self,
106        ctx: &RequestCtx<'_>,
107        partition: PartitionId,
108        logical_index: &str,
109    ) -> Result<Resolved, SpiError> {
110        let at = self.spi.placement_for(&partition).await?;
111        // Carry the cluster's endpoint (from the placement result) onto the
112        // target so the sink can pool it, the tenancy is the source of truth for
113        // where each cluster lives.
114        let target = target_for(&at.placement, logical_index).with_endpoint(at.endpoint.clone());
115        let body_transform = self.build_transform(&at.placement, &partition, ctx)?;
116        let decision = RouteDecision {
117            target,
118            upstream_protocol: ctx.protocol(),
119            header_ops: Vec::new(),
120            body_transform,
121            epoch: at.epoch,
122        };
123        Ok(Resolved {
124            partition,
125            decision,
126            migration: at.phase,
127        })
128    }
129
130    /// Builds the body transform for a placement, resolving injected-field
131    /// values and enforcing the shared-index partition-in-id invariant.
132    fn build_transform(
133        &self,
134        placement: &Placement,
135        partition: &PartitionId,
136        ctx: &RequestCtx<'_>,
137    ) -> Result<BodyTransform, SpiError> {
138        let inject = match placement {
139            Placement::SharedIndex { inject, .. } => resolve_inject(inject, partition, ctx)?,
140            Placement::DedicatedCluster { .. } | Placement::DedicatedIndex { .. } => Vec::new(),
141        };
142
143        let id_rule = self.spi.doc_id_rule();
144        // In SharedIndex mode the partition id is MANDATORY in the doc-id template
145        // (docs/03 §4): by-id reads/writes (`_doc/{id}`) bypass the query filter and
146        // hit the physical id directly, so without a partition-scoped id two tenants
147        // collide on the same `_id`, a cross-tenant overwrite on write and a
148        // cross-tenant read on get. A *missing* rule is as unsafe as a partition-free
149        // one, so reject both here rather than only validating a rule that happens to
150        // be present.
151        if let Placement::SharedIndex { .. } = placement {
152            let partition_scoped = id_rule
153                .as_ref()
154                .is_some_and(|rule| rule.template.references_partition());
155            if !partition_scoped {
156                return Err(SpiError::IdRuleMissingPartition);
157            }
158        }
159
160        Ok(match (inject.is_empty(), id_rule) {
161            (true, None) => BodyTransform::None,
162            (false, None) => BodyTransform::Inject(inject),
163            (true, Some(id)) => BodyTransform::ConstructId(id),
164            (false, Some(id)) => BodyTransform::Both { inject, id },
165        })
166    }
167}
168
169impl<T: TenancySpi> RoutingSpi for TenancyRouter<T> {
170    async fn route(&self, ctx: &RequestCtx<'_>) -> Result<RouteDecision, SpiError> {
171        Ok(self.resolve(ctx).await?.decision)
172    }
173}
174
175/// The partition-aware routing seam the engine pipeline drives.
176///
177/// [`RoutingSpi`] yields only a [`RouteDecision`]; the engine needs more, the
178/// resolved partition (to construct `_id`/`_routing` and to demux bulk per
179/// document), the epoch and migration phase (the write gate), and a split
180/// resolve so a bulk request can resolve the partition per document but cache the
181/// placement per partition. This trait captures exactly that contract, so the
182/// pipeline is generic over *any* router that can provide it, not nailed to the
183/// concrete [`TenancyRouter`]. [`TenancyRouter`] is the in-tree implementation.
184#[allow(
185    async_fn_in_trait,
186    reason = "consumed through generics in the engine, where Send is verified at \
187              the spawn site, mirroring TenancySpi/RoutingSpi (docs/02 §2)"
188)]
189pub trait Router: Send + Sync + 'static {
190    /// Resolves the full routing plan for a single-document request.
191    ///
192    /// # Errors
193    ///
194    /// Returns [`SpiError`] if the endpoint is not tenancy-aware, the partition
195    /// cannot be resolved, no placement exists, or the transforms are invalid.
196    async fn resolve(&self, ctx: &RequestCtx<'_>) -> Result<Resolved, SpiError>;
197
198    /// Resolves just the partition id for a request and document (the bulk demux
199    /// entry point).
200    ///
201    /// # Errors
202    ///
203    /// Returns [`SpiError::PartitionUnresolved`] if no source yields a partition.
204    fn resolve_partition(
205        &self,
206        ctx: &RequestCtx<'_>,
207        body: BodyDoc<'_>,
208    ) -> Result<PartitionId, SpiError>;
209
210    /// Resolves a known partition to its placement and routing plan.
211    ///
212    /// # Errors
213    ///
214    /// Returns [`SpiError`] if no placement exists or the transforms are invalid.
215    async fn resolve_placement(
216        &self,
217        ctx: &RequestCtx<'_>,
218        partition: PartitionId,
219        logical_index: &str,
220    ) -> Result<Resolved, SpiError>;
221
222    /// The migration write gate: may a write that resolved at `epoch` for
223    /// `partition` still commit? `false` ⇒ reject as a retryable stale-epoch error.
224    async fn admit_write(&self, partition: &PartitionId, epoch: Epoch) -> bool;
225
226    /// The base URL of a cluster by id, for the cursor-affinity and admin paths
227    /// that route by cluster without a placement. Default `None`.
228    fn cluster_endpoint(&self, _cluster: &ClusterId) -> Option<String> {
229        None
230    }
231}
232
233impl<T: TenancySpi> Router for TenancyRouter<T> {
234    async fn resolve(&self, ctx: &RequestCtx<'_>) -> Result<Resolved, SpiError> {
235        TenancyRouter::resolve(self, ctx).await
236    }
237
238    fn resolve_partition(
239        &self,
240        ctx: &RequestCtx<'_>,
241        body: BodyDoc<'_>,
242    ) -> Result<PartitionId, SpiError> {
243        TenancyRouter::resolve_partition(self, ctx, body)
244    }
245
246    async fn resolve_placement(
247        &self,
248        ctx: &RequestCtx<'_>,
249        partition: PartitionId,
250        logical_index: &str,
251    ) -> Result<Resolved, SpiError> {
252        TenancyRouter::resolve_placement(self, ctx, partition, logical_index).await
253    }
254
255    async fn admit_write(&self, partition: &PartitionId, epoch: Epoch) -> bool {
256        TenancyRouter::admit_write(self, partition, epoch).await
257    }
258
259    fn cluster_endpoint(&self, cluster: &ClusterId) -> Option<String> {
260        self.spi.cluster_endpoint(cluster)
261    }
262}
263
264/// Derives the physical [`Target`] from a placement and the request's logical
265/// index. A dedicated cluster carries the logical index name unchanged; the
266/// other modes pin a concrete physical index.
267fn target_for(placement: &Placement, logical_index: &str) -> Target {
268    match placement {
269        Placement::DedicatedCluster { cluster } => {
270            Target::new(cluster.clone(), IndexName::from(logical_index))
271        }
272        Placement::DedicatedIndex { cluster, index }
273        | Placement::SharedIndex { cluster, index, .. } => {
274            Target::new(cluster.clone(), index.clone())
275        }
276    }
277}
278
279/// Resolves the *context-derived* injected values to constants, using the
280/// request. The `PartitionId` value is left as-is: it is the read-isolation key,
281/// and downstream stages resolve it to the partition, so the read path can tell
282/// the isolation field apart from the decorative (context-derived) ones.
283fn resolve_inject(
284    fields: &[InjectedField],
285    _partition: &PartitionId,
286    ctx: &RequestCtx<'_>,
287) -> Result<Vec<InjectedField>, SpiError> {
288    fields
289        .iter()
290        .map(|field| {
291            let value = match &field.value {
292                // The isolation field stays symbolic; never filtered on a
293                // context-derived value (which would differ on read).
294                InjectedValue::PartitionId => return Ok(field.clone()),
295                InjectedValue::Constant(constant) => constant.clone(),
296                InjectedValue::FromPrincipal(attr) => ctx
297                    .principal()
298                    .attr(attr)
299                    .map(|v| Value::String(v.to_owned()))
300                    .ok_or_else(|| SpiError::PrincipalAttrMissing { attr: attr.clone() })?,
301                InjectedValue::FromHeader(name) => ctx
302                    .headers()
303                    .get(name)
304                    .map(|v| Value::String(v.to_owned()))
305                    .ok_or_else(|| SpiError::HeaderMissing {
306                        header: name.clone(),
307                    })?,
308            };
309            Ok(InjectedField::new(
310                field.name.clone(),
311                InjectedValue::Constant(value),
312            ))
313        })
314        .collect()
315}
316
317#[cfg(test)]
318#[path = "router_tests.rs"]
319mod tests;