osproxy_control/affinity.rs
1//! Cursor (scroll / PIT) affinity: pinning a cursor's follow-up requests to the
2//! physical cluster that created it (`docs/03` §6).
3//!
4//! A scroll id or point-in-time id is bound to the cluster that opened it; a
5//! continuation sent elsewhere is meaningless. So when affinity is **on**
6//! ([`Affinity::Pin`]), the proxy records `cursor_id -> cluster` when a cursor is
7//! created and resolves follow-ups to that cluster, bypassing the normal
8//! partition-resolution path. The binding is **bounded and TTL'd**, it expires
9//! with the cursor's keep-alive and the map is capacity-capped, so a flood of
10//! cursors cannot grow memory without limit (NFR-P). Affinity is opt-in and off
11//! by default, so deployments that do not use cursors pay no state cost.
12//!
13//! Time comes from an injected [`Clock`], so expiry is deterministic in tests.
14
15use std::collections::HashMap;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use osproxy_core::{Clock, ClusterId, Instant, SystemClock};
20
21/// The default cursor-binding TTL: bindings expire on this keep-alive if not
22/// refreshed, matching a typical scroll/PIT lifetime.
23pub const DEFAULT_CURSOR_TTL: Duration = Duration::from_secs(300);
24
25/// The default cap on live cursor bindings, bounding affinity memory (NFR-P).
26pub const DEFAULT_CAPACITY: usize = 100_000;
27
28/// Whether the proxy pins cursor follow-ups to the cluster that created them.
29/// Opt-in, off by default, deployments without cursors pay no state cost
30/// (`docs/03` §6).
31#[derive(Clone, Copy, PartialEq, Eq, Debug, Default)]
32pub enum Affinity {
33 /// No pinning; cursor requests resolve through the normal path.
34 #[default]
35 Off,
36 /// Pin each cursor's follow-ups to its creating cluster.
37 Pin,
38}
39
40/// One cursor's binding: the cluster that owns it and when it was pinned.
41#[derive(Clone, Debug)]
42struct Pinned {
43 cluster: ClusterId,
44 pinned_at: Instant,
45}
46
47/// A bounded, TTL'd map from cursor id to the cluster that created it
48/// (`docs/03` §6). Cloneable handles are not provided; wrap in an `Arc` to share.
49pub struct CursorAffinity {
50 clock: Arc<dyn Clock>,
51 ttl: Duration,
52 capacity: usize,
53 entries: Mutex<HashMap<String, Pinned>>,
54}
55
56impl std::fmt::Debug for CursorAffinity {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 // The injected `Clock` is not `Debug`; the rest is the useful shape.
59 f.debug_struct("CursorAffinity")
60 .field("ttl", &self.ttl)
61 .field("capacity", &self.capacity)
62 .field("live", &self.len())
63 .finish_non_exhaustive()
64 }
65}
66
67impl CursorAffinity {
68 /// Builds a cursor-affinity map with the given binding TTL and capacity,
69 /// reading time from the system clock.
70 #[must_use]
71 pub fn new(ttl: Duration, capacity: usize) -> Self {
72 Self {
73 clock: Arc::new(SystemClock),
74 ttl,
75 capacity: capacity.max(1),
76 entries: Mutex::new(HashMap::new()),
77 }
78 }
79
80 /// Swaps the clock that drives expiry (tests inject a `ManualClock`).
81 #[must_use]
82 pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
83 self.clock = clock;
84 self
85 }
86
87 /// Records that `cursor_id` lives on `cluster`. Expired bindings are swept
88 /// first; if the map is still at capacity, the oldest binding is evicted so
89 /// the new one fits (bounded memory).
90 pub fn pin(&self, cursor_id: impl Into<String>, cluster: ClusterId) {
91 let now = self.clock.now();
92 let mut entries = self.lock();
93 entries.retain(|_, p| !self.is_expired(p, now));
94 if entries.len() >= self.capacity {
95 if let Some(oldest) = entries
96 .iter()
97 .min_by_key(|(_, p)| p.pinned_at)
98 .map(|(k, _)| k.clone())
99 {
100 entries.remove(&oldest);
101 }
102 }
103 entries.insert(
104 cursor_id.into(),
105 Pinned {
106 cluster,
107 pinned_at: now,
108 },
109 );
110 }
111
112 /// The cluster `cursor_id` is pinned to, or `None` if it is unknown or its
113 /// binding has expired (lazy expiry, a stale binding is never returned).
114 #[must_use]
115 pub fn resolve(&self, cursor_id: &str) -> Option<ClusterId> {
116 let now = self.clock.now();
117 let entries = self.lock();
118 entries
119 .get(cursor_id)
120 .filter(|p| !self.is_expired(p, now))
121 .map(|p| p.cluster.clone())
122 }
123
124 /// Drops `cursor_id`'s binding (e.g. on an explicit clear-scroll / close-PIT).
125 pub fn release(&self, cursor_id: &str) {
126 self.lock().remove(cursor_id);
127 }
128
129 /// The number of bindings currently held (including any not yet swept).
130 #[must_use]
131 pub fn len(&self) -> usize {
132 self.lock().len()
133 }
134
135 /// Whether no bindings are held.
136 #[must_use]
137 pub fn is_empty(&self) -> bool {
138 self.lock().is_empty()
139 }
140
141 /// Whether a binding pinned at `p.pinned_at` is past its TTL at `now`.
142 fn is_expired(&self, p: &Pinned, now: Instant) -> bool {
143 now.saturating_duration_since(p.pinned_at) >= self.ttl
144 }
145
146 /// Locks the map, recovering a poisoned lock, it is plain cache data with no
147 /// invariant a panicking holder could tear (NFR-R1).
148 fn lock(&self) -> std::sync::MutexGuard<'_, HashMap<String, Pinned>> {
149 self.entries
150 .lock()
151 .unwrap_or_else(std::sync::PoisonError::into_inner)
152 }
153}