Skip to main content

osproxy_control/
affinity.rs

1//! Cursor (scroll / PIT) affinity: pinning a cursor's follow-up requests to the
2//! physical cluster that created it (`docs/03` §6).
3//!
4//! A scroll id or point-in-time id is bound to the cluster that opened it; a
5//! continuation sent elsewhere is meaningless. So when affinity is **on**
6//! ([`Affinity::Pin`]), the proxy records `cursor_id -> cluster` when a cursor is
7//! created and resolves follow-ups to that cluster, bypassing the normal
8//! partition-resolution path. The binding is **bounded and TTL'd**, it expires
9//! with the cursor's keep-alive and the map is capacity-capped, so a flood of
10//! cursors cannot grow memory without limit (NFR-P). Affinity is opt-in and off
11//! by default, so deployments that do not use cursors pay no state cost.
12//!
13//! Time comes from an injected [`Clock`], so expiry is deterministic in tests.
14
15use std::collections::HashMap;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use osproxy_core::{Clock, ClusterId, Instant, SystemClock};
20
21/// The default cursor-binding TTL: bindings expire on this keep-alive if not
22/// refreshed, matching a typical scroll/PIT lifetime.
23pub const DEFAULT_CURSOR_TTL: Duration = Duration::from_secs(300);
24
25/// The default cap on live cursor bindings, bounding affinity memory (NFR-P).
26pub const DEFAULT_CAPACITY: usize = 100_000;
27
28/// Whether the proxy pins cursor follow-ups to the cluster that created them.
29/// Opt-in, off by default, deployments without cursors pay no state cost
30/// (`docs/03` §6).
31#[derive(Clone, Copy, PartialEq, Eq, Debug, Default)]
32pub enum Affinity {
33    /// No pinning; cursor requests resolve through the normal path.
34    #[default]
35    Off,
36    /// Pin each cursor's follow-ups to its creating cluster.
37    Pin,
38}
39
40/// One cursor's binding: the cluster that owns it and when it was pinned.
41#[derive(Clone, Debug)]
42struct Pinned {
43    cluster: ClusterId,
44    pinned_at: Instant,
45}
46
47/// A bounded, TTL'd map from cursor id to the cluster that created it
48/// (`docs/03` §6). Cloneable handles are not provided; wrap in an `Arc` to share.
49pub struct CursorAffinity {
50    clock: Arc<dyn Clock>,
51    ttl: Duration,
52    capacity: usize,
53    entries: Mutex<HashMap<String, Pinned>>,
54}
55
56impl std::fmt::Debug for CursorAffinity {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        // The injected `Clock` is not `Debug`; the rest is the useful shape.
59        f.debug_struct("CursorAffinity")
60            .field("ttl", &self.ttl)
61            .field("capacity", &self.capacity)
62            .field("live", &self.len())
63            .finish_non_exhaustive()
64    }
65}
66
67impl CursorAffinity {
68    /// Builds a cursor-affinity map with the given binding TTL and capacity,
69    /// reading time from the system clock.
70    #[must_use]
71    pub fn new(ttl: Duration, capacity: usize) -> Self {
72        Self {
73            clock: Arc::new(SystemClock),
74            ttl,
75            capacity: capacity.max(1),
76            entries: Mutex::new(HashMap::new()),
77        }
78    }
79
80    /// Swaps the clock that drives expiry (tests inject a `ManualClock`).
81    #[must_use]
82    pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
83        self.clock = clock;
84        self
85    }
86
87    /// Records that `cursor_id` lives on `cluster`. Expired bindings are swept
88    /// first; if the map is still at capacity, the oldest binding is evicted so
89    /// the new one fits (bounded memory).
90    pub fn pin(&self, cursor_id: impl Into<String>, cluster: ClusterId) {
91        let now = self.clock.now();
92        let mut entries = self.lock();
93        entries.retain(|_, p| !self.is_expired(p, now));
94        if entries.len() >= self.capacity {
95            if let Some(oldest) = entries
96                .iter()
97                .min_by_key(|(_, p)| p.pinned_at)
98                .map(|(k, _)| k.clone())
99            {
100                entries.remove(&oldest);
101            }
102        }
103        entries.insert(
104            cursor_id.into(),
105            Pinned {
106                cluster,
107                pinned_at: now,
108            },
109        );
110    }
111
112    /// The cluster `cursor_id` is pinned to, or `None` if it is unknown or its
113    /// binding has expired (lazy expiry, a stale binding is never returned).
114    #[must_use]
115    pub fn resolve(&self, cursor_id: &str) -> Option<ClusterId> {
116        let now = self.clock.now();
117        let entries = self.lock();
118        entries
119            .get(cursor_id)
120            .filter(|p| !self.is_expired(p, now))
121            .map(|p| p.cluster.clone())
122    }
123
124    /// Drops `cursor_id`'s binding (e.g. on an explicit clear-scroll / close-PIT).
125    pub fn release(&self, cursor_id: &str) {
126        self.lock().remove(cursor_id);
127    }
128
129    /// The number of bindings currently held (including any not yet swept).
130    #[must_use]
131    pub fn len(&self) -> usize {
132        self.lock().len()
133    }
134
135    /// Whether no bindings are held.
136    #[must_use]
137    pub fn is_empty(&self) -> bool {
138        self.lock().is_empty()
139    }
140
141    /// Whether a binding pinned at `p.pinned_at` is past its TTL at `now`.
142    fn is_expired(&self, p: &Pinned, now: Instant) -> bool {
143        now.saturating_duration_since(p.pinned_at) >= self.ttl
144    }
145
146    /// Locks the map, recovering a poisoned lock, it is plain cache data with no
147    /// invariant a panicking holder could tear (NFR-R1).
148    fn lock(&self) -> std::sync::MutexGuard<'_, HashMap<String, Pinned>> {
149        self.entries
150            .lock()
151            .unwrap_or_else(std::sync::PoisonError::into_inner)
152    }
153}