osproxy_spi/tenancy.rs
1//! The high-level tenancy contract, what most implementers provide.
2
3use osproxy_core::{ClusterId, Epoch, PartitionId};
4
5use crate::error::SpiError;
6use crate::placement::PlacementAt;
7use crate::request::{BodyDoc, RequestCtx};
8use crate::rules::{DocIdRule, InjectedField, SensitivitySpec};
9
10/// The tenancy-focused contract most implementers provide.
11///
12/// It declares tenancy *rules*, how to find the partition, how to build the
13/// document `_id`, which fields to inject, which are sensitive, plus a
14/// placement lookup. `osproxy-tenancy` turns this into a [`crate::RoutingSpi`],
15/// so tenancy implementers never touch [`crate::RouteDecision`] plumbing
16/// (`docs/02` §2).
17///
18/// # Invariants
19///
20/// - [`TenancySpi::resolve_partition`] MUST yield a partition id for every
21/// routable request, or it returns [`SpiError::PartitionUnresolved`] and the
22/// request is rejected.
23/// - In `SharedIndex` mode the partition id MUST be part of the constructed
24/// `_id` to prevent cross-tenant id collisions (`docs/03`); the adapter
25/// enforces this.
26/// - [`TenancySpi::injected_fields`] names and [`TenancySpi::sensitive_fields`]
27/// MUST be stable for a given logical-index version, so the read-path
28/// strip/filter stays symmetric with the write-path inject.
29///
30/// # Examples
31///
32/// ```
33/// use osproxy_core::{ClusterId, Epoch, FieldName, IndexName, PartitionId};
34/// use osproxy_spi::{
35/// BodyDoc, InjectedField, InjectedValue, Placement, PlacementAt, PartitionKeySpecKind,
36/// RequestCtx, SensitivitySpec, SpiError, TenancySpi,
37/// };
38///
39/// struct OneTenantPerHeader;
40///
41/// impl TenancySpi for OneTenantPerHeader {
42/// fn resolve_partition(&self, ctx: &RequestCtx<'_>, _body: BodyDoc<'_>)
43/// -> Result<PartitionId, SpiError>
44/// {
45/// // Real impls usually defer to `osproxy_tenancy::resolve_partition_spec`;
46/// // here we resolve inline to keep the SPI crate self-contained.
47/// ctx.headers().get("x-tenant").map(PartitionId::from).ok_or(
48/// SpiError::PartitionUnresolved { tried: vec![PartitionKeySpecKind::Header] })
49/// }
50/// fn doc_id_rule(&self) -> Option<osproxy_spi::DocIdRule> { None }
51/// fn injected_fields(&self) -> Vec<InjectedField> {
52/// vec![InjectedField::new(FieldName::from("_tenant"), InjectedValue::PartitionId)]
53/// }
54/// fn sensitive_fields(&self) -> SensitivitySpec { SensitivitySpec::none() }
55/// async fn placement_for(&self, p: &PartitionId) -> Result<PlacementAt, SpiError> {
56/// Ok(PlacementAt::new(
57/// Placement::SharedIndex {
58/// cluster: ClusterId::from("eu-1"),
59/// index: IndexName::from("logs-shared"),
60/// inject: self.injected_fields(),
61/// },
62/// Epoch::ZERO,
63/// ))
64/// }
65/// }
66/// ```
67#[allow(
68 async_fn_in_trait,
69 reason = "consumed through generics in osproxy-tenancy's adapter; Send is \
70 checked at the engine's spawn site (docs/02 §2)"
71)]
72pub trait TenancySpi: Send + Sync + 'static {
73 /// Resolves the partition id for a request.
74 ///
75 /// `body` is a [`BodyDoc`] view over the document: the whole request for
76 /// single-doc ingest, or one operation's source line for `_bulk`. Read the
77 /// partition key from it with [`BodyDoc::scalar`], the proxy scans the bytes
78 /// on demand, so no JSON tree is built (ADR-014).
79 ///
80 /// Most implementations just defer to the declarative resolver
81 /// `osproxy_tenancy::resolve_partition_spec`, naming the source(s) the
82 /// partition id lives in (a body field, a header, a principal attribute):
83 ///
84 /// ```ignore
85 /// fn resolve_partition(&self, ctx: &RequestCtx<'_>, body: BodyDoc<'_>)
86 /// -> Result<PartitionId, SpiError>
87 /// {
88 /// osproxy_tenancy::resolve_partition_spec(
89 /// &PartitionKeySpec::BodyField(JsonPath::new("tenant_id")), ctx, body)
90 /// }
91 /// ```
92 ///
93 /// Compose [`BodyDoc::scalar`] with header/principal lookups for cases the
94 /// declarative sources cannot express, combining several inputs, decoding a
95 /// structured token, without ever parsing raw bytes yourself. You choose the
96 /// order; nothing is tried implicitly before you.
97 ///
98 /// # Errors
99 ///
100 /// Returns [`SpiError::PartitionUnresolved`] when no configured source yields a
101 /// partition id; the request is then rejected.
102 ///
103 /// The no-value-leak rule holds (NFR-S2): whatever you decode here must not be
104 /// logged. The id you return is treated as a partition id (an opaque routing
105 /// key), never as a tenant *value* to capture.
106 fn resolve_partition(
107 &self,
108 ctx: &RequestCtx<'_>,
109 body: BodyDoc<'_>,
110 ) -> Result<PartitionId, SpiError>;
111
112 /// Optional rule to construct the document `_id` (and `_routing`).
113 fn doc_id_rule(&self) -> Option<DocIdRule>;
114
115 /// Fields injected on ingest and stripped on read. The field *names* are
116 /// chosen here (the SPI decides them).
117 fn injected_fields(&self) -> Vec<InjectedField>;
118
119 /// Declares which field *values* observability may capture, driving
120 /// value-suppression (NFR-S2). Deny-by-default: the standard implementation
121 /// returns [`SensitivitySpec::all_sensitive`] (everything redacted) and
122 /// allow-lists known-safe fields with [`SensitivitySpec::allowing`]. The
123 /// default here is `all_sensitive`, so a tenancy that does not override it
124 /// leaks nothing.
125 fn sensitive_fields(&self) -> SensitivitySpec {
126 SensitivitySpec::all_sensitive()
127 }
128
129 /// Resolves a partition to its current placement and the epoch it was read
130 /// at. NOT a pure function, migration mutates the placement state.
131 ///
132 /// # Errors
133 ///
134 /// Returns [`SpiError::PlacementMissing`] when the partition has no
135 /// placement, or [`SpiError::PlacementBackend`] when the lookup backend is
136 /// unavailable.
137 async fn placement_for(&self, partition: &PartitionId) -> Result<PlacementAt, SpiError>;
138
139 /// The migration write gate (`docs/06` §2): may a write that resolved at
140 /// `epoch` for `partition` still commit? Re-checked at dispatch, after the
141 /// decision was stamped, so a placement that advanced (or entered cutover) in
142 /// the meantime is caught. `false` means reject as a retryable stale-epoch
143 /// error; the client re-resolves against the new placement.
144 ///
145 /// Defaults to always-admit: an implementation without live migration (a
146 /// constant placement) never needs to hold a write.
147 async fn admit_write(&self, _partition: &PartitionId, _epoch: Epoch) -> bool {
148 true
149 }
150
151 /// The base URL of a cluster, by id. The data plane carries each cluster's
152 /// endpoint on the placement result, but the cursor-affinity and admin
153 /// pass-through paths route to a cluster by id with no placement to consult,
154 /// so they resolve the endpoint through this lookup. Return `None` for an
155 /// unknown cluster; the request then fails closed rather than route blind.
156 ///
157 /// Default `None`. A tenancy that runs cursor affinity or admin pass-through
158 /// against `OpenSearchSink` must implement it for the clusters those paths
159 /// reach (which is just its own cluster catalog by id).
160 fn cluster_endpoint(&self, _cluster: &ClusterId) -> Option<String> {
161 None
162 }
163}