osproxy_core/cursor.rs
1//! Stateless cursor-affinity envelope (`docs/03` §6).
2//!
3//! A scroll / PIT cursor is bound to the one physical cluster that created it, so
4//! every follow-up request must reach that same cluster. In a **fleet** the
5//! create and the continue may land on different proxy instances, so a binding
6//! kept in one instance's memory is invisible to the others.
7//!
8//! This module makes the binding **travel with the cursor** instead of living in
9//! a shared store: on create the proxy wraps the upstream cursor id together with
10//! its cluster into a signed token the client echoes back; on continue *any*
11//! instance recovers the cluster from the token alone, no shared state, no
12//! replication lag, no read-after-write race. The upstream id is carried
13//! verbatim as the payload (we never need spare room *inside* it), and the proxy
14//! strips the envelope before talking upstream, so OpenSearch never sees it.
15//!
16//! The signature binds the cluster to *this* cursor: a client cannot redirect a
17//! cursor to another cluster (the tag would not verify), and a tampered token
18//! fails closed to "unresolvable cursor", never a wrong-cluster dispatch.
19
20use crate::ids::ClusterId;
21
22/// Signs the cluster↔cursor binding. The concrete HMAC implementation (behind the
23/// build's crypto provider) lives in the binary; this seam keeps the codec pure
24/// and lets tests inject a deterministic signer.
25///
26/// The tag MUST be a deterministic function of `msg` and a fleet-wide shared key,
27/// so an instance that did not create the cursor still verifies it.
28pub trait CursorSigner: Send + Sync {
29 /// A tag authenticating `msg`. Same key + same `msg` ⇒ same tag, on every
30 /// instance.
31 fn tag(&self, msg: &[u8]) -> Vec<u8>;
32}
33
34/// Wraps `cursor` (the upstream scroll/PIT id) with `cluster` into a signed,
35/// self-describing token for the client. Format `{cluster_hex}.{tag_hex}.{cursor}`,
36/// the cursor verbatim (it is base64, so it never contains the `.` delimiter).
37#[must_use]
38pub fn wrap(signer: &dyn CursorSigner, cluster: &ClusterId, cursor: &str) -> String {
39 let tag = signer.tag(&binding(cluster, cursor));
40 // Pre-size to the exact token length so the String never reallocates while
41 // framing: two hex fields (2 chars/byte), two `.` separators, the cursor.
42 let mut out =
43 String::with_capacity(cluster.as_str().len() * 2 + tag.len() * 2 + 2 + cursor.len());
44 push_hex(&mut out, cluster.as_str().as_bytes());
45 out.push('.');
46 push_hex(&mut out, &tag);
47 out.push('.');
48 out.push_str(cursor);
49 out
50}
51
52/// Recovers `(cluster, upstream cursor)` from a token produced by [`wrap`], or
53/// `None` if it is malformed or its signature does not verify (**fail-closed**,
54/// a bad token is never routed anywhere).
55#[must_use]
56pub fn unwrap(signer: &dyn CursorSigner, token: &str) -> Option<(ClusterId, String)> {
57 let mut parts = token.splitn(3, '.');
58 let cluster_hex = parts.next()?;
59 let tag_hex = parts.next()?;
60 let cursor = parts.next()?;
61 // Decode the cluster hex straight into the owned String the id will hold, so
62 // there is no intermediate byte Vec to free (the id is move-constructed).
63 let cluster = ClusterId::from(decode_hex_to_string(cluster_hex)?);
64 // Verify by re-deriving the expected tag and comparing it against the
65 // provided hex *in place*, no decoded-tag Vec is allocated. The compare is
66 // constant-time over content for an equal length, like `constant_time_eq`.
67 let expected = signer.tag(&binding(&cluster, cursor));
68 if hex_eq_ct(tag_hex, &expected) {
69 Some((cluster, cursor.to_owned()))
70 } else {
71 None
72 }
73}
74
75/// The signed message: `cluster` and `cursor` joined by a byte that cannot appear
76/// in either (a unit separator), so neither field can be shifted into the other.
77fn binding(cluster: &ClusterId, cursor: &str) -> Vec<u8> {
78 let mut msg = Vec::with_capacity(cluster.as_str().len() + 1 + cursor.len());
79 msg.extend_from_slice(cluster.as_str().as_bytes());
80 msg.push(0x1f);
81 msg.extend_from_slice(cursor.as_bytes());
82 msg
83}
84
85/// Constant-time equality of `expected` (raw bytes) against a hex string, without
86/// decoding the hex into its own buffer. A length mismatch is unequal; for an
87/// equal length the loop runs to completion without an early return, so a forged
88/// tag leaks no timing signal about how many bytes matched. A non-hex digit makes
89/// the whole comparison unequal (a malformed tag is a forged tag).
90fn hex_eq_ct(hex: &str, expected: &[u8]) -> bool {
91 let hex = hex.as_bytes();
92 if hex.len() != expected.len() * 2 {
93 return false;
94 }
95 let mut diff = 0u8;
96 for (pair, &want) in hex.chunks_exact(2).zip(expected.iter()) {
97 match (hex_val(pair[0]), hex_val(pair[1])) {
98 (Some(hi), Some(lo)) => diff |= ((hi << 4) | lo) ^ want,
99 // Mark a mismatch but keep scanning: no early return on content.
100 _ => diff |= 1,
101 }
102 }
103 diff == 0
104}
105
106/// Appends the lowercase hex of `bytes` to `out`.
107fn push_hex(out: &mut String, bytes: &[u8]) {
108 const DIGITS: &[u8; 16] = b"0123456789abcdef";
109 for &b in bytes {
110 out.push(DIGITS[(b >> 4) as usize] as char);
111 out.push(DIGITS[(b & 0x0f) as usize] as char);
112 }
113}
114
115/// Decodes a lowercase/uppercase hex string directly into an owned UTF-8 string,
116/// or `None` on an odd length, a non-hex digit, or non-UTF-8 bytes. The decode
117/// buffer becomes the `String` without a second allocation, so recovering the
118/// cluster id costs one allocation rather than a byte `Vec` plus a copy.
119fn decode_hex_to_string(hex: &str) -> Option<String> {
120 if !hex.len().is_multiple_of(2) {
121 return None;
122 }
123 let bytes = hex.as_bytes();
124 let mut out = Vec::with_capacity(hex.len() / 2);
125 for pair in bytes.chunks_exact(2) {
126 let hi = hex_val(pair[0])?;
127 let lo = hex_val(pair[1])?;
128 out.push((hi << 4) | lo);
129 }
130 String::from_utf8(out).ok()
131}
132
133/// The value of a single hex digit, or `None` if it is not one.
134fn hex_val(c: u8) -> Option<u8> {
135 match c {
136 b'0'..=b'9' => Some(c - b'0'),
137 b'a'..=b'f' => Some(c - b'a' + 10),
138 b'A'..=b'F' => Some(c - b'A' + 10),
139 _ => None,
140 }
141}
142
143#[cfg(test)]
144#[path = "cursor_tests.rs"]
145mod tests;