osproxy_tenancy/router.rs
1//! Adapts a high-level [`TenancySpi`] into the low-level [`RoutingSpi`].
2//!
3//! This is where declarative tenancy rules become a concrete routing decision:
4//! resolve the partition, look up its placement, derive the physical target,
5//! and assemble the body transform (with injected-field values already resolved
6//! to constants, so downstream stages stay pure). The `SharedIndex`
7//! partition-in-id invariant (`docs/03`) is enforced here.
8
9use osproxy_core::{ClusterId, Epoch, IndexName, PartitionId, Target};
10use osproxy_spi::{
11 BodyDoc, BodyTransform, InjectedField, InjectedValue, MigrationPhase, Placement, RequestCtx,
12 RouteDecision, RoutingSpi, SpiError, TenancySpi,
13};
14use serde_json::Value;
15
16/// A fully resolved routing decision plus the partition it was resolved for.
17///
18/// The engine consumes this richer result directly (it needs the partition to
19/// construct the document `_id` and `_routing`); the [`RoutingSpi`] impl exposes
20/// just the [`RouteDecision`].
21#[derive(Clone, PartialEq, Eq, Debug)]
22pub struct Resolved {
23 /// The resolved partition id.
24 pub partition: PartitionId,
25 /// The routing decision derived from the partition's placement.
26 pub decision: RouteDecision,
27 /// The partition's migration phase at resolve time (shape-only, for
28 /// observability, `docs/06` §5).
29 pub migration: MigrationPhase,
30}
31
32/// Turns a [`TenancySpi`] implementation into a [`RoutingSpi`].
33#[derive(Debug)]
34pub struct TenancyRouter<T> {
35 spi: T,
36}
37
38impl<T: TenancySpi> TenancyRouter<T> {
39 /// Wraps a tenancy implementation.
40 #[must_use]
41 pub fn new(spi: T) -> Self {
42 Self { spi }
43 }
44
45 /// The wrapped tenancy implementation.
46 #[must_use]
47 pub fn spi(&self) -> &T {
48 &self.spi
49 }
50
51 /// The migration write gate for a resolved decision (`docs/06` §2): whether a
52 /// write that resolved at `epoch` for `partition` may still commit. The
53 /// engine calls this at dispatch; `false` is surfaced as a retryable
54 /// stale-epoch error. Delegates to the [`TenancySpi`].
55 pub async fn admit_write(&self, partition: &PartitionId, epoch: Epoch) -> bool {
56 self.spi.admit_write(partition, epoch).await
57 }
58
59 /// Resolves the full routing plan for `ctx` (the single-document path).
60 ///
61 /// # Errors
62 ///
63 /// Returns [`SpiError`] if the endpoint is not tenancy-aware, the partition
64 /// cannot be resolved, no placement exists, or the configured transforms are
65 /// invalid (e.g. a shared-index id rule that omits the partition).
66 pub async fn resolve(&self, ctx: &RequestCtx<'_>) -> Result<Resolved, SpiError> {
67 if !ctx.endpoint().is_tenancy_aware() {
68 return Err(SpiError::UnsupportedEndpoint {
69 endpoint: ctx.endpoint(),
70 });
71 }
72 // The body is scanned on demand for the partition key, never parsed into
73 // a JSON tree (ADR-014).
74 let partition = self.resolve_partition(ctx, BodyDoc::new(ctx.body()))?;
75 self.resolve_placement(ctx, partition, ctx.logical_index())
76 .await
77 }
78
79 /// Resolves just the partition id for a request and document, without a
80 /// placement lookup. The per-document entry point for bulk demux (`docs/04`
81 /// §3), where each operation carries its own source as a [`BodyDoc`].
82 ///
83 /// # Errors
84 ///
85 /// Returns [`SpiError::PartitionUnresolved`] if no configured source yields
86 /// the partition.
87 pub fn resolve_partition(
88 &self,
89 ctx: &RequestCtx<'_>,
90 body: BodyDoc<'_>,
91 ) -> Result<PartitionId, SpiError> {
92 self.spi.resolve_partition(ctx, body)
93 }
94
95 /// Resolves a known partition to its placement and the routing plan for a
96 /// given logical index. Separated from [`Self::resolve_partition`] so a bulk
97 /// request can resolve the partition per document but cache the placement
98 /// per partition.
99 ///
100 /// # Errors
101 ///
102 /// Returns [`SpiError`] if no placement exists or the configured transforms
103 /// are invalid (e.g. a shared-index id rule that omits the partition).
104 pub async fn resolve_placement(
105 &self,
106 ctx: &RequestCtx<'_>,
107 partition: PartitionId,
108 logical_index: &str,
109 ) -> Result<Resolved, SpiError> {
110 let at = self.spi.placement_for(&partition).await?;
111 // Carry the cluster's endpoint (from the placement result) onto the
112 // target so the sink can pool it, the tenancy is the source of truth for
113 // where each cluster lives.
114 let target = target_for(&at.placement, logical_index).with_endpoint(at.endpoint.clone());
115 let body_transform = self.build_transform(&at.placement, &partition, ctx)?;
116 let decision = RouteDecision {
117 target,
118 upstream_protocol: ctx.protocol(),
119 header_ops: Vec::new(),
120 body_transform,
121 epoch: at.epoch,
122 };
123 Ok(Resolved {
124 partition,
125 decision,
126 migration: at.phase,
127 })
128 }
129
130 /// Builds the body transform for a placement, resolving injected-field
131 /// values and enforcing the shared-index partition-in-id invariant.
132 fn build_transform(
133 &self,
134 placement: &Placement,
135 partition: &PartitionId,
136 ctx: &RequestCtx<'_>,
137 ) -> Result<BodyTransform, SpiError> {
138 let inject = match placement {
139 Placement::SharedIndex { inject, .. } => resolve_inject(inject, partition, ctx)?,
140 Placement::DedicatedCluster { .. } | Placement::DedicatedIndex { .. } => Vec::new(),
141 };
142
143 let id_rule = self.spi.doc_id_rule();
144 // In SharedIndex mode the partition id is MANDATORY in the doc-id template
145 // (docs/03 §4): by-id reads/writes (`_doc/{id}`) bypass the query filter and
146 // hit the physical id directly, so without a partition-scoped id two tenants
147 // collide on the same `_id`, a cross-tenant overwrite on write and a
148 // cross-tenant read on get. A *missing* rule is as unsafe as a partition-free
149 // one, so reject both here rather than only validating a rule that happens to
150 // be present.
151 if let Placement::SharedIndex { .. } = placement {
152 let partition_scoped = id_rule
153 .as_ref()
154 .is_some_and(|rule| rule.template.references_partition());
155 if !partition_scoped {
156 return Err(SpiError::IdRuleMissingPartition);
157 }
158 }
159
160 Ok(match (inject.is_empty(), id_rule) {
161 (true, None) => BodyTransform::None,
162 (false, None) => BodyTransform::Inject(inject),
163 (true, Some(id)) => BodyTransform::ConstructId(id),
164 (false, Some(id)) => BodyTransform::Both { inject, id },
165 })
166 }
167}
168
169impl<T: TenancySpi> RoutingSpi for TenancyRouter<T> {
170 async fn route(&self, ctx: &RequestCtx<'_>) -> Result<RouteDecision, SpiError> {
171 Ok(self.resolve(ctx).await?.decision)
172 }
173}
174
175/// The partition-aware routing seam the engine pipeline drives.
176///
177/// [`RoutingSpi`] yields only a [`RouteDecision`]; the engine needs more, the
178/// resolved partition (to construct `_id`/`_routing` and to demux bulk per
179/// document), the epoch and migration phase (the write gate), and a split
180/// resolve so a bulk request can resolve the partition per document but cache the
181/// placement per partition. This trait captures exactly that contract, so the
182/// pipeline is generic over *any* router that can provide it, not nailed to the
183/// concrete [`TenancyRouter`]. [`TenancyRouter`] is the in-tree implementation.
184#[allow(
185 async_fn_in_trait,
186 reason = "consumed through generics in the engine, where Send is verified at \
187 the spawn site, mirroring TenancySpi/RoutingSpi (docs/02 §2)"
188)]
189pub trait Router: Send + Sync + 'static {
190 /// Resolves the full routing plan for a single-document request.
191 ///
192 /// # Errors
193 ///
194 /// Returns [`SpiError`] if the endpoint is not tenancy-aware, the partition
195 /// cannot be resolved, no placement exists, or the transforms are invalid.
196 async fn resolve(&self, ctx: &RequestCtx<'_>) -> Result<Resolved, SpiError>;
197
198 /// Resolves just the partition id for a request and document (the bulk demux
199 /// entry point).
200 ///
201 /// # Errors
202 ///
203 /// Returns [`SpiError::PartitionUnresolved`] if no source yields a partition.
204 fn resolve_partition(
205 &self,
206 ctx: &RequestCtx<'_>,
207 body: BodyDoc<'_>,
208 ) -> Result<PartitionId, SpiError>;
209
210 /// Resolves a known partition to its placement and routing plan.
211 ///
212 /// # Errors
213 ///
214 /// Returns [`SpiError`] if no placement exists or the transforms are invalid.
215 async fn resolve_placement(
216 &self,
217 ctx: &RequestCtx<'_>,
218 partition: PartitionId,
219 logical_index: &str,
220 ) -> Result<Resolved, SpiError>;
221
222 /// The migration write gate: may a write that resolved at `epoch` for
223 /// `partition` still commit? `false` ⇒ reject as a retryable stale-epoch error.
224 async fn admit_write(&self, partition: &PartitionId, epoch: Epoch) -> bool;
225
226 /// The base URL of a cluster by id, for the cursor-affinity and admin paths
227 /// that route by cluster without a placement. Default `None`.
228 fn cluster_endpoint(&self, _cluster: &ClusterId) -> Option<String> {
229 None
230 }
231}
232
233impl<T: TenancySpi> Router for TenancyRouter<T> {
234 async fn resolve(&self, ctx: &RequestCtx<'_>) -> Result<Resolved, SpiError> {
235 TenancyRouter::resolve(self, ctx).await
236 }
237
238 fn resolve_partition(
239 &self,
240 ctx: &RequestCtx<'_>,
241 body: BodyDoc<'_>,
242 ) -> Result<PartitionId, SpiError> {
243 TenancyRouter::resolve_partition(self, ctx, body)
244 }
245
246 async fn resolve_placement(
247 &self,
248 ctx: &RequestCtx<'_>,
249 partition: PartitionId,
250 logical_index: &str,
251 ) -> Result<Resolved, SpiError> {
252 TenancyRouter::resolve_placement(self, ctx, partition, logical_index).await
253 }
254
255 async fn admit_write(&self, partition: &PartitionId, epoch: Epoch) -> bool {
256 TenancyRouter::admit_write(self, partition, epoch).await
257 }
258
259 fn cluster_endpoint(&self, cluster: &ClusterId) -> Option<String> {
260 self.spi.cluster_endpoint(cluster)
261 }
262}
263
264/// Derives the physical [`Target`] from a placement and the request's logical
265/// index. A dedicated cluster carries the logical index name unchanged; the
266/// other modes pin a concrete physical index.
267fn target_for(placement: &Placement, logical_index: &str) -> Target {
268 match placement {
269 Placement::DedicatedCluster { cluster } => {
270 Target::new(cluster.clone(), IndexName::from(logical_index))
271 }
272 Placement::DedicatedIndex { cluster, index }
273 | Placement::SharedIndex { cluster, index, .. } => {
274 Target::new(cluster.clone(), index.clone())
275 }
276 }
277}
278
279/// Resolves the *context-derived* injected values to constants, using the
280/// request. The `PartitionId` value is left as-is: it is the read-isolation key,
281/// and downstream stages resolve it to the partition, so the read path can tell
282/// the isolation field apart from the decorative (context-derived) ones.
283fn resolve_inject(
284 fields: &[InjectedField],
285 _partition: &PartitionId,
286 ctx: &RequestCtx<'_>,
287) -> Result<Vec<InjectedField>, SpiError> {
288 fields
289 .iter()
290 .map(|field| {
291 let value = match &field.value {
292 // The isolation field stays symbolic; never filtered on a
293 // context-derived value (which would differ on read).
294 InjectedValue::PartitionId => return Ok(field.clone()),
295 InjectedValue::Constant(constant) => constant.clone(),
296 InjectedValue::FromPrincipal(attr) => ctx
297 .principal()
298 .attr(attr)
299 .map(|v| Value::String(v.to_owned()))
300 .ok_or_else(|| SpiError::PrincipalAttrMissing { attr: attr.clone() })?,
301 InjectedValue::FromHeader(name) => ctx
302 .headers()
303 .get(name)
304 .map(|v| Value::String(v.to_owned()))
305 .ok_or_else(|| SpiError::HeaderMissing {
306 header: name.clone(),
307 })?,
308 };
309 Ok(InjectedField::new(
310 field.name.clone(),
311 InjectedValue::Constant(value),
312 ))
313 })
314 .collect()
315}
316
317#[cfg(test)]
318#[path = "router_tests.rs"]
319mod tests;