osproxy_tenancy/router.rs
1//! Adapts a high-level [`TenancySpi`] into the low-level [`RoutingSpi`].
2//!
3//! This is where declarative tenancy rules become a concrete routing decision:
4//! resolve the partition, look up its placement, derive the physical target,
5//! and assemble the body transform (with injected-field values already resolved
6//! to constants, so downstream stages stay pure). The `SharedIndex`
7//! partition-in-id invariant (`docs/03`) is enforced here.
8
9use std::future::Future;
10
11use osproxy_core::{ClusterId, Epoch, IndexName, PartitionId, Target};
12use osproxy_spi::{
13 BodyDoc, BodyTransform, InjectedField, InjectedValue, MigrationPhase, Placement, RequestCtx,
14 RouteDecision, RoutingSpi, SpiError, TenancySpi,
15};
16use serde_json::Value;
17
18/// A fully resolved routing decision plus the partition it was resolved for.
19///
20/// The engine consumes this richer result directly (it needs the partition to
21/// construct the document `_id` and `_routing`); the [`RoutingSpi`] impl exposes
22/// just the [`RouteDecision`].
23#[derive(Clone, PartialEq, Eq, Debug)]
24pub struct Resolved {
25 /// The resolved partition id.
26 pub partition: PartitionId,
27 /// The routing decision derived from the partition's placement.
28 pub decision: RouteDecision,
29 /// The partition's migration phase at resolve time (shape-only, for
30 /// observability, `docs/06` §5).
31 pub migration: MigrationPhase,
32}
33
34/// Turns a [`TenancySpi`] implementation into a [`RoutingSpi`].
35#[derive(Debug)]
36pub struct TenancyRouter<T> {
37 spi: T,
38}
39
40impl<T: TenancySpi> TenancyRouter<T> {
41 /// Wraps a tenancy implementation.
42 #[must_use]
43 pub fn new(spi: T) -> Self {
44 Self { spi }
45 }
46
47 /// The wrapped tenancy implementation.
48 #[must_use]
49 pub fn spi(&self) -> &T {
50 &self.spi
51 }
52
53 /// The migration write gate for a resolved decision (`docs/06` §2): whether a
54 /// write that resolved at `epoch` for `partition` may still commit. The
55 /// engine calls this at dispatch; `false` is surfaced as a retryable
56 /// stale-epoch error. Delegates to the [`TenancySpi`].
57 pub async fn admit_write(&self, partition: &PartitionId, epoch: Epoch) -> bool {
58 self.spi.admit_write(partition, epoch).await
59 }
60
61 /// Resolves the full routing plan for `ctx` (the single-document path).
62 ///
63 /// # Errors
64 ///
65 /// Returns [`SpiError`] if the endpoint is not tenancy-aware, the partition
66 /// cannot be resolved, no placement exists, or the configured transforms are
67 /// invalid (e.g. a shared-index id rule that omits the partition).
68 pub async fn resolve(&self, ctx: &RequestCtx<'_>) -> Result<Resolved, SpiError> {
69 if !ctx.endpoint().is_tenancy_aware() {
70 return Err(SpiError::UnsupportedEndpoint {
71 endpoint: ctx.endpoint(),
72 });
73 }
74 // The body is scanned on demand for the partition key, never parsed into
75 // a JSON tree (ADR-014).
76 let partition = self.resolve_partition(ctx, BodyDoc::new(ctx.body()))?;
77 self.resolve_placement(ctx, partition, ctx.logical_index())
78 .await
79 }
80
81 /// Resolves just the partition id for a request and document, without a
82 /// placement lookup. The per-document entry point for bulk demux (`docs/04`
83 /// §3), where each operation carries its own source as a [`BodyDoc`].
84 ///
85 /// # Errors
86 ///
87 /// Returns [`SpiError::PartitionUnresolved`] if no configured source yields
88 /// the partition.
89 pub fn resolve_partition(
90 &self,
91 ctx: &RequestCtx<'_>,
92 body: BodyDoc<'_>,
93 ) -> Result<PartitionId, SpiError> {
94 self.spi.resolve_partition(ctx, body)
95 }
96
97 /// Resolves a known partition to its placement and the routing plan for a
98 /// given logical index. Separated from [`Self::resolve_partition`] so a bulk
99 /// request can resolve the partition per document but cache the placement
100 /// per partition.
101 ///
102 /// # Errors
103 ///
104 /// Returns [`SpiError`] if no placement exists or the configured transforms
105 /// are invalid (e.g. a shared-index id rule that omits the partition).
106 pub async fn resolve_placement(
107 &self,
108 ctx: &RequestCtx<'_>,
109 partition: PartitionId,
110 logical_index: &str,
111 ) -> Result<Resolved, SpiError> {
112 let at = self.spi.placement_for(&partition).await?;
113 // Carry the cluster's endpoint (from the placement result) onto the
114 // target so the sink can pool it, the tenancy is the source of truth for
115 // where each cluster lives.
116 let target = target_for(&at.placement, logical_index).with_endpoint(at.endpoint.clone());
117 let body_transform = self.build_transform(&at.placement, &partition, ctx)?;
118 let decision = RouteDecision {
119 target,
120 upstream_protocol: ctx.protocol(),
121 header_ops: Vec::new(),
122 body_transform,
123 epoch: at.epoch,
124 };
125 Ok(Resolved {
126 partition,
127 decision,
128 migration: at.phase,
129 })
130 }
131
132 /// Builds the body transform for a placement, resolving injected-field
133 /// values and enforcing the shared-index partition-in-id invariant.
134 fn build_transform(
135 &self,
136 placement: &Placement,
137 partition: &PartitionId,
138 ctx: &RequestCtx<'_>,
139 ) -> Result<BodyTransform, SpiError> {
140 let inject = match placement {
141 Placement::SharedIndex { inject, .. } => resolve_inject(inject, partition, ctx)?,
142 Placement::DedicatedCluster { .. } | Placement::DedicatedIndex { .. } => Vec::new(),
143 };
144
145 let id_rule = self.spi.doc_id_rule();
146 // In SharedIndex mode the partition id is MANDATORY in the doc-id template
147 // (docs/03 §4): by-id reads/writes (`_doc/{id}`) bypass the query filter and
148 // hit the physical id directly, so without a partition-scoped id two tenants
149 // collide on the same `_id`, a cross-tenant overwrite on write and a
150 // cross-tenant read on get. A *missing* rule is as unsafe as a partition-free
151 // one, so reject both here rather than only validating a rule that happens to
152 // be present.
153 if let Placement::SharedIndex { .. } = placement {
154 let partition_scoped = id_rule
155 .as_ref()
156 .is_some_and(|rule| rule.template.references_partition());
157 if !partition_scoped {
158 return Err(SpiError::IdRuleMissingPartition);
159 }
160 }
161
162 Ok(match (inject.is_empty(), id_rule) {
163 (true, None) => BodyTransform::None,
164 (false, None) => BodyTransform::Inject(inject),
165 (true, Some(id)) => BodyTransform::ConstructId(id),
166 (false, Some(id)) => BodyTransform::Both { inject, id },
167 })
168 }
169}
170
171impl<T: TenancySpi> RoutingSpi for TenancyRouter<T> {
172 async fn route(&self, ctx: &RequestCtx<'_>) -> Result<RouteDecision, SpiError> {
173 Ok(self.resolve(ctx).await?.decision)
174 }
175}
176
177/// The partition-aware routing seam the engine pipeline drives.
178///
179/// [`RoutingSpi`] yields only a [`RouteDecision`]; the engine needs more, the
180/// resolved partition (to construct `_id`/`_routing` and to demux bulk per
181/// document), the epoch and migration phase (the write gate), and a split
182/// resolve so a bulk request can resolve the partition per document but cache the
183/// placement per partition. This trait captures exactly that contract, so the
184/// pipeline is generic over *any* router that can provide it, not nailed to the
185/// concrete [`TenancyRouter`]. [`TenancyRouter`] is the in-tree implementation.
186pub trait Router: Send + Sync + 'static {
187 /// Resolves the full routing plan for a single-document request.
188 ///
189 /// # Errors
190 ///
191 /// Returns [`SpiError`] if the endpoint is not tenancy-aware, the partition
192 /// cannot be resolved, no placement exists, or the transforms are invalid.
193 fn resolve(
194 &self,
195 ctx: &RequestCtx<'_>,
196 ) -> impl Future<Output = Result<Resolved, SpiError>> + Send;
197
198 /// Resolves just the partition id for a request and document (the bulk demux
199 /// entry point).
200 ///
201 /// # Errors
202 ///
203 /// Returns [`SpiError::PartitionUnresolved`] if no source yields a partition.
204 fn resolve_partition(
205 &self,
206 ctx: &RequestCtx<'_>,
207 body: BodyDoc<'_>,
208 ) -> Result<PartitionId, SpiError>;
209
210 /// Resolves a known partition to its placement and routing plan.
211 ///
212 /// # Errors
213 ///
214 /// Returns [`SpiError`] if no placement exists or the transforms are invalid.
215 fn resolve_placement(
216 &self,
217 ctx: &RequestCtx<'_>,
218 partition: PartitionId,
219 logical_index: &str,
220 ) -> impl Future<Output = Result<Resolved, SpiError>> + Send;
221
222 /// The migration write gate: may a write that resolved at `epoch` for
223 /// `partition` still commit? `false` ⇒ reject as a retryable stale-epoch error.
224 fn admit_write(
225 &self,
226 partition: &PartitionId,
227 epoch: Epoch,
228 ) -> impl Future<Output = bool> + Send;
229
230 /// The base URL of a cluster by id, for the cursor-affinity and admin paths
231 /// that route by cluster without a placement. Default `None`.
232 fn cluster_endpoint(&self, _cluster: &ClusterId) -> Option<String> {
233 None
234 }
235}
236
237impl<T: TenancySpi> Router for TenancyRouter<T> {
238 async fn resolve(&self, ctx: &RequestCtx<'_>) -> Result<Resolved, SpiError> {
239 TenancyRouter::resolve(self, ctx).await
240 }
241
242 fn resolve_partition(
243 &self,
244 ctx: &RequestCtx<'_>,
245 body: BodyDoc<'_>,
246 ) -> Result<PartitionId, SpiError> {
247 TenancyRouter::resolve_partition(self, ctx, body)
248 }
249
250 async fn resolve_placement(
251 &self,
252 ctx: &RequestCtx<'_>,
253 partition: PartitionId,
254 logical_index: &str,
255 ) -> Result<Resolved, SpiError> {
256 TenancyRouter::resolve_placement(self, ctx, partition, logical_index).await
257 }
258
259 async fn admit_write(&self, partition: &PartitionId, epoch: Epoch) -> bool {
260 TenancyRouter::admit_write(self, partition, epoch).await
261 }
262
263 fn cluster_endpoint(&self, cluster: &ClusterId) -> Option<String> {
264 self.spi.cluster_endpoint(cluster)
265 }
266}
267
268/// Derives the physical [`Target`] from a placement and the request's logical
269/// index. A dedicated cluster carries the logical index name unchanged; the
270/// other modes pin a concrete physical index.
271fn target_for(placement: &Placement, logical_index: &str) -> Target {
272 match placement {
273 Placement::DedicatedCluster { cluster } => {
274 Target::new(cluster.clone(), IndexName::from(logical_index))
275 }
276 Placement::DedicatedIndex { cluster, index }
277 | Placement::SharedIndex { cluster, index, .. } => {
278 Target::new(cluster.clone(), index.clone())
279 }
280 }
281}
282
283/// Resolves the *context-derived* injected values to constants, using the
284/// request. The `PartitionId` value is left as-is: it is the read-isolation key,
285/// and downstream stages resolve it to the partition, so the read path can tell
286/// the isolation field apart from the decorative (context-derived) ones.
287fn resolve_inject(
288 fields: &[InjectedField],
289 _partition: &PartitionId,
290 ctx: &RequestCtx<'_>,
291) -> Result<Vec<InjectedField>, SpiError> {
292 fields
293 .iter()
294 .map(|field| {
295 let value = match &field.value {
296 // The isolation field stays symbolic; never filtered on a
297 // context-derived value (which would differ on read).
298 InjectedValue::PartitionId => return Ok(field.clone()),
299 InjectedValue::Constant(constant) => constant.clone(),
300 InjectedValue::FromPrincipal(attr) => ctx
301 .principal()
302 .attr(attr)
303 .map(|v| Value::String(v.to_owned()))
304 .ok_or_else(|| SpiError::PrincipalAttrMissing { attr: attr.clone() })?,
305 InjectedValue::FromHeader(name) => ctx
306 .headers()
307 .get(name)
308 .map(|v| Value::String(v.to_owned()))
309 .ok_or_else(|| SpiError::HeaderMissing {
310 header: name.clone(),
311 })?,
312 };
313 Ok(InjectedField::new(
314 field.name.clone(),
315 InjectedValue::Constant(value),
316 ))
317 })
318 .collect()
319}
320
321#[cfg(test)]
322#[path = "router_tests.rs"]
323mod tests;