osproxy_spi/tenancy.rs
1//! The high-level tenancy contract, what most implementers provide.
2
3use std::future::Future;
4
5use osproxy_core::{ClusterId, Epoch, PartitionId};
6
7use crate::error::SpiError;
8use crate::placement::PlacementAt;
9use crate::request::{BodyDoc, RequestCtx};
10use crate::rules::{DocIdRule, InjectedField, SensitivitySpec};
11
12/// The tenancy-focused contract most implementers provide.
13///
14/// It declares tenancy *rules*, how to find the partition, how to build the
15/// document `_id`, which fields to inject, which are sensitive, plus a
16/// placement lookup. `osproxy-tenancy` turns this into a [`crate::RoutingSpi`],
17/// so tenancy implementers never touch [`crate::RouteDecision`] plumbing
18/// (`docs/02` §2).
19///
20/// # Invariants
21///
22/// - [`TenancySpi::resolve_partition`] MUST yield a partition id for every
23/// routable request, or it returns [`SpiError::PartitionUnresolved`] and the
24/// request is rejected.
25/// - In `SharedIndex` mode the partition id MUST be part of the constructed
26/// `_id` to prevent cross-tenant id collisions (`docs/03`); the adapter
27/// enforces this.
28/// - [`TenancySpi::injected_fields`] names and [`TenancySpi::sensitive_fields`]
29/// MUST be stable for a given logical-index version, so the read-path
30/// strip/filter stays symmetric with the write-path inject.
31///
32/// # Examples
33///
34/// ```
35/// use osproxy_core::{ClusterId, Epoch, FieldName, IndexName, PartitionId};
36/// use osproxy_spi::{
37/// BodyDoc, InjectedField, InjectedValue, Placement, PlacementAt, PartitionKeySpecKind,
38/// RequestCtx, SensitivitySpec, SpiError, TenancySpi,
39/// };
40///
41/// struct OneTenantPerHeader;
42///
43/// impl TenancySpi for OneTenantPerHeader {
44/// fn resolve_partition(&self, ctx: &RequestCtx<'_>, _body: BodyDoc<'_>)
45/// -> Result<PartitionId, SpiError>
46/// {
47/// // Real impls usually defer to `osproxy_tenancy::resolve_partition_spec`;
48/// // here we resolve inline to keep the SPI crate self-contained.
49/// ctx.headers().get("x-tenant").map(PartitionId::from).ok_or(
50/// SpiError::PartitionUnresolved { tried: vec![PartitionKeySpecKind::Header] })
51/// }
52/// fn doc_id_rule(&self) -> Option<osproxy_spi::DocIdRule> { None }
53/// fn injected_fields(&self) -> Vec<InjectedField> {
54/// vec![InjectedField::new(FieldName::from("_tenant"), InjectedValue::PartitionId)]
55/// }
56/// fn sensitive_fields(&self) -> SensitivitySpec { SensitivitySpec::none() }
57/// async fn placement_for(&self, p: &PartitionId) -> Result<PlacementAt, SpiError> {
58/// Ok(PlacementAt::new(
59/// Placement::SharedIndex {
60/// cluster: ClusterId::from("eu-1"),
61/// index: IndexName::from("logs-shared"),
62/// inject: self.injected_fields(),
63/// },
64/// Epoch::ZERO,
65/// ))
66/// }
67/// }
68/// ```
69pub trait TenancySpi: Send + Sync + 'static {
70 /// Resolves the partition id for a request.
71 ///
72 /// `body` is a [`BodyDoc`] view over the document: the whole request for
73 /// single-doc ingest, or one operation's source line for `_bulk`. Read the
74 /// partition key from it with [`BodyDoc::scalar`], the proxy scans the bytes
75 /// on demand, so no JSON tree is built (ADR-014).
76 ///
77 /// Most implementations just defer to the declarative resolver
78 /// `osproxy_tenancy::resolve_partition_spec`, naming the source(s) the
79 /// partition id lives in (a body field, a header, a principal attribute):
80 ///
81 /// ```ignore
82 /// fn resolve_partition(&self, ctx: &RequestCtx<'_>, body: BodyDoc<'_>)
83 /// -> Result<PartitionId, SpiError>
84 /// {
85 /// osproxy_tenancy::resolve_partition_spec(
86 /// &PartitionKeySpec::BodyField(JsonPath::new("tenant_id")), ctx, body)
87 /// }
88 /// ```
89 ///
90 /// Compose [`BodyDoc::scalar`] with header/principal lookups for cases the
91 /// declarative sources cannot express, combining several inputs, decoding a
92 /// structured token, without ever parsing raw bytes yourself. You choose the
93 /// order; nothing is tried implicitly before you.
94 ///
95 /// # Errors
96 ///
97 /// Returns [`SpiError::PartitionUnresolved`] when no configured source yields a
98 /// partition id; the request is then rejected.
99 ///
100 /// The no-value-leak rule holds (NFR-S2): whatever you decode here must not be
101 /// logged. The id you return is treated as a partition id (an opaque routing
102 /// key), never as a tenant *value* to capture.
103 fn resolve_partition(
104 &self,
105 ctx: &RequestCtx<'_>,
106 body: BodyDoc<'_>,
107 ) -> Result<PartitionId, SpiError>;
108
109 /// Optional rule to construct the document `_id` (and `_routing`).
110 fn doc_id_rule(&self) -> Option<DocIdRule>;
111
112 /// Fields injected on ingest and stripped on read. The field *names* are
113 /// chosen here (the SPI decides them).
114 fn injected_fields(&self) -> Vec<InjectedField>;
115
116 /// Declares which field *values* observability may capture, driving
117 /// value-suppression (NFR-S2). Deny-by-default: the standard implementation
118 /// returns [`SensitivitySpec::all_sensitive`] (everything redacted) and
119 /// allow-lists known-safe fields with [`SensitivitySpec::allowing`]. The
120 /// default here is `all_sensitive`, so a tenancy that does not override it
121 /// leaks nothing.
122 fn sensitive_fields(&self) -> SensitivitySpec {
123 SensitivitySpec::all_sensitive()
124 }
125
126 /// Resolves a partition to its current placement and the epoch it was read
127 /// at. NOT a pure function, migration mutates the placement state.
128 ///
129 /// # Errors
130 ///
131 /// Returns [`SpiError::PlacementMissing`] when the partition has no
132 /// placement, or [`SpiError::PlacementBackend`] when the lookup backend is
133 /// unavailable.
134 fn placement_for(
135 &self,
136 partition: &PartitionId,
137 ) -> impl Future<Output = Result<PlacementAt, SpiError>> + Send;
138
139 /// The migration write gate (`docs/06` §2): may a write that resolved at
140 /// `epoch` for `partition` still commit? Re-checked at dispatch, after the
141 /// decision was stamped, so a placement that advanced (or entered cutover) in
142 /// the meantime is caught. `false` means reject as a retryable stale-epoch
143 /// error; the client re-resolves against the new placement.
144 ///
145 /// Defaults to always-admit: an implementation without live migration (a
146 /// constant placement) never needs to hold a write.
147 fn admit_write(
148 &self,
149 _partition: &PartitionId,
150 _epoch: Epoch,
151 ) -> impl Future<Output = bool> + Send {
152 async { true }
153 }
154
155 /// The base URL of a cluster, by id. The data plane carries each cluster's
156 /// endpoint on the placement result, but the cursor-affinity and admin
157 /// pass-through paths route to a cluster by id with no placement to consult,
158 /// so they resolve the endpoint through this lookup. Return `None` for an
159 /// unknown cluster; the request then fails closed rather than route blind.
160 ///
161 /// Default `None`. A tenancy that runs cursor affinity or admin pass-through
162 /// against `OpenSearchSink` must implement it for the clusters those paths
163 /// reach (which is just its own cluster catalog by id).
164 fn cluster_endpoint(&self, _cluster: &ClusterId) -> Option<String> {
165 None
166 }
167}