Skip to main content

omnigraph_server/
workload.rs

1//! Per-actor admission control for the HTTP server (MR-686 §VII.A).
2//!
3//! The HTTP server's previous global `RwLock<Omnigraph>` serialized every
4//! mutating request across all actors. PR 2 removes that lock — engine
5//! APIs are now `&self`, so concurrent calls from different actors can
6//! run against `Arc<Omnigraph>` simultaneously. Without admission
7//! control, one heavy actor can exhaust shared capacity (Lance I/O
8//! threads, manifest churn, network) and starve other actors.
9//!
10//! This module provides:
11//!
12//! - **Per-actor in-flight count cap**: each actor has a
13//!   `tokio::sync::Semaphore` with `OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX`
14//!   permits (default 16). `try_acquire_owned()` returns `Err` when
15//!   exhausted; the server maps this to HTTP 429.
16//!
17//! - **Per-actor in-flight byte budget**: each actor accumulates an
18//!   `AtomicU64` byte estimate. `fetch_add(est_bytes)` then a check
19//!   against `byte_cap` is race-free via decrement-on-rejection. The
20//!   server maps an over-budget result to HTTP 429 as well.
21//!
22//! Counts are governed by the semaphore (race-free `try_acquire_owned()`
23//! enforces the cap atomically); bytes use `fetch_add` + decrement-on-
24//! rejection. Both checks are atomic compare-and-act, never
25//! load-then-act — the test
26//! `actor_admission_race_does_not_exceed_cap` pins this contract by
27//! spawning 32 concurrent `try_admit` calls against a cap of 16 and
28//! asserting exactly 16 succeed.
29//!
30//! Acquisition order against the engine's per-(table, branch) write
31//! queue: admission FIRST (the HTTP handler reserves capacity before
32//! calling into the engine), engine queue SECOND (acquired inside
33//! `MutationStaging::commit_all`). This composes cleanly because
34//! admission is a single per-actor count + budget check, never
35//! cross-actor; nothing the engine does can change a peer actor's
36//! admission state.
37
38use std::sync::Arc;
39use std::sync::atomic::{AtomicU64, Ordering};
40
41use dashmap::DashMap;
42use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};
43
44/// Default per-actor in-flight count cap. Override via
45/// `OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX`.
46pub const DEFAULT_PER_ACTOR_INFLIGHT_MAX: u32 = 16;
47
48/// Default per-actor in-flight byte budget (4 GiB). Override via
49/// `OMNIGRAPH_PER_ACTOR_BYTES_MAX`.
50pub const DEFAULT_PER_ACTOR_BYTES_MAX: u64 = 4 * 1024 * 1024 * 1024;
51
52/// Why a `try_admit` call returned `Err`. The server maps each variant
53/// to a specific HTTP response code; see `WorkloadController` docs.
54#[derive(Debug, Clone, PartialEq, Eq)]
55pub enum RejectReason {
56    /// Actor exceeded the per-actor in-flight count cap. HTTP 429.
57    InFlightCountExceeded { cap: u32 },
58    /// Actor exceeded the per-actor in-flight byte budget. HTTP 429.
59    ByteBudgetExceeded { cap: u64, attempted: u64 },
60}
61
62impl std::fmt::Display for RejectReason {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        match self {
65            RejectReason::InFlightCountExceeded { cap } => {
66                write!(f, "actor in-flight count cap {} exceeded", cap)
67            }
68            RejectReason::ByteBudgetExceeded { cap, attempted } => write!(
69                f,
70                "actor byte budget exceeded: would use {} bytes against cap {}",
71                attempted, cap
72            ),
73        }
74    }
75}
76
77/// Per-actor counters. One instance per actor_id, lazily created on
78/// first admission attempt.
79#[derive(Debug)]
80pub(crate) struct ActorState {
81    /// Counts the number of concurrent in-flight requests for this
82    /// actor. `try_acquire_owned()` is the count-cap gate.
83    in_flight_sem: Arc<Semaphore>,
84    /// Total bytes estimated to be in flight for this actor across
85    /// concurrent requests. `fetch_add` + check + decrement-on-failure
86    /// keeps the cap atomic.
87    bytes: AtomicU64,
88    /// Per-actor byte cap (snapshot of `WorkloadController.byte_cap`
89    /// at construction; cap mutations don't propagate to existing
90    /// ActorStates by design — controller config changes apply on
91    /// next ActorState construction).
92    byte_cap: u64,
93    /// Per-actor count cap (same snapshot semantics as `byte_cap`).
94    inflight_cap: u32,
95}
96
97impl ActorState {
98    fn new(inflight_cap: u32, byte_cap: u64) -> Self {
99        Self {
100            in_flight_sem: Arc::new(Semaphore::new(inflight_cap as usize)),
101            bytes: AtomicU64::new(0),
102            byte_cap,
103            inflight_cap,
104        }
105    }
106}
107
108/// Server-side per-actor admission controller. Constructed once at
109/// server startup and shared via `Arc<WorkloadController>` on
110/// `AppState`.
111pub struct WorkloadController {
112    per_actor: DashMap<Arc<str>, Arc<ActorState>>,
113    inflight_cap: u32,
114    byte_cap: u64,
115}
116
117impl WorkloadController {
118    /// Construct from explicit caps. Tests can override.
119    pub fn new(inflight_cap: u32, byte_cap: u64) -> Self {
120        Self {
121            per_actor: DashMap::new(),
122            inflight_cap,
123            byte_cap,
124        }
125    }
126
127    /// Construct from environment variables, falling back to defaults.
128    /// Bad env values fall back to the default with a `tracing::warn!`.
129    pub fn from_env() -> Self {
130        let inflight_cap = parse_env_u32(
131            "OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX",
132            DEFAULT_PER_ACTOR_INFLIGHT_MAX,
133        );
134        let byte_cap = parse_env_u64("OMNIGRAPH_PER_ACTOR_BYTES_MAX", DEFAULT_PER_ACTOR_BYTES_MAX);
135        Self::new(inflight_cap, byte_cap)
136    }
137
138    /// Construct with default caps. Suitable for tests / single-tenant
139    /// deployments without explicit configuration.
140    pub fn with_defaults() -> Self {
141        Self::new(DEFAULT_PER_ACTOR_INFLIGHT_MAX, DEFAULT_PER_ACTOR_BYTES_MAX)
142    }
143
144    fn actor_state(&self, actor_id: &Arc<str>) -> Arc<ActorState> {
145        if let Some(existing) = self.per_actor.get(actor_id) {
146            return existing.clone();
147        }
148        // Race-on-construct is benign: DashMap's `entry().or_insert_with`
149        // serializes per-key construction; the loser's freshly-built
150        // ActorState gets dropped without observable effect.
151        self.per_actor
152            .entry(actor_id.clone())
153            .or_insert_with(|| Arc::new(ActorState::new(self.inflight_cap, self.byte_cap)))
154            .clone()
155    }
156
157    /// Reserve admission for one in-flight request from `actor_id`
158    /// estimated to consume `est_bytes`. Returns an `AdmissionGuard`
159    /// that releases the count permit + decrements the byte total
160    /// when dropped.
161    ///
162    /// On rejection, the byte counter is decremented before returning
163    /// — callers can retry without leaking budget.
164    pub fn try_admit(
165        &self,
166        actor_id: &Arc<str>,
167        est_bytes: u64,
168    ) -> Result<AdmissionGuard, RejectReason> {
169        let state = self.actor_state(actor_id);
170
171        // Count gate: race-free via `try_acquire_owned()`. If exhausted,
172        // immediately reject — no byte accounting needed for this request.
173        let permit = match Arc::clone(&state.in_flight_sem).try_acquire_owned() {
174            Ok(permit) => permit,
175            Err(TryAcquireError::NoPermits) => {
176                return Err(RejectReason::InFlightCountExceeded {
177                    cap: state.inflight_cap,
178                });
179            }
180            Err(TryAcquireError::Closed) => {
181                return Err(RejectReason::InFlightCountExceeded {
182                    cap: state.inflight_cap,
183                });
184            }
185        };
186
187        // Byte gate: atomic fetch_add then check; decrement on overflow.
188        // `Ordering::SeqCst` is conservative; per-actor accounting is
189        // not on the hot path of read queries.
190        let prior = state.bytes.fetch_add(est_bytes, Ordering::SeqCst);
191        let attempted = prior.saturating_add(est_bytes);
192        if attempted > state.byte_cap {
193            // Roll back the byte add. The permit drops with `permit`
194            // going out of scope below.
195            state.bytes.fetch_sub(est_bytes, Ordering::SeqCst);
196            return Err(RejectReason::ByteBudgetExceeded {
197                cap: state.byte_cap,
198                attempted,
199            });
200        }
201
202        Ok(AdmissionGuard {
203            _permit: permit,
204            actor_state: state,
205            est_bytes,
206        })
207    }
208}
209
210/// Drop-on-completion guard for an admitted request. Dropping releases
211/// the in-flight count permit (via `Drop` on the underlying semaphore
212/// permit) and decrements the actor's byte counter.
213#[derive(Debug)]
214pub struct AdmissionGuard {
215    _permit: OwnedSemaphorePermit,
216    actor_state: Arc<ActorState>,
217    est_bytes: u64,
218}
219
220impl Drop for AdmissionGuard {
221    fn drop(&mut self) {
222        self.actor_state
223            .bytes
224            .fetch_sub(self.est_bytes, Ordering::SeqCst);
225    }
226}
227
228fn parse_env_u32(name: &str, default: u32) -> u32 {
229    match std::env::var(name) {
230        Ok(v) => v.parse::<u32>().unwrap_or_else(|err| {
231            tracing::warn!(
232                env = name,
233                value = %v,
234                error = %err,
235                default,
236                "invalid env value, using default"
237            );
238            default
239        }),
240        Err(_) => default,
241    }
242}
243
244fn parse_env_u64(name: &str, default: u64) -> u64 {
245    match std::env::var(name) {
246        Ok(v) => v.parse::<u64>().unwrap_or_else(|err| {
247            tracing::warn!(
248                env = name,
249                value = %v,
250                error = %err,
251                default,
252                "invalid env value, using default"
253            );
254            default
255        }),
256        Err(_) => default,
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263
264    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
265    async fn try_admit_admits_under_cap() {
266        let controller = WorkloadController::new(2, 1024);
267        let actor: Arc<str> = "alice".into();
268        let g1 = controller.try_admit(&actor, 100).expect("first admit");
269        let _g2 = controller.try_admit(&actor, 100).expect("second admit");
270        let err = controller
271            .try_admit(&actor, 100)
272            .expect_err("third should reject on count");
273        assert!(matches!(
274            err,
275            RejectReason::InFlightCountExceeded { cap: 2 }
276        ));
277        drop(g1);
278        // After drop, a new admit succeeds again.
279        let _g3 = controller.try_admit(&actor, 100).expect("admit after drop");
280    }
281
282    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
283    async fn byte_budget_caps_admission() {
284        let controller = WorkloadController::new(16, 1000);
285        let actor: Arc<str> = "alice".into();
286        let _g1 = controller.try_admit(&actor, 600).expect("first admit");
287        let err = controller
288            .try_admit(&actor, 600)
289            .expect_err("second should reject on bytes");
290        match err {
291            RejectReason::ByteBudgetExceeded { cap, attempted } => {
292                assert_eq!(cap, 1000);
293                assert_eq!(attempted, 1200);
294            }
295            other => panic!("expected ByteBudgetExceeded, got {:?}", other),
296        }
297        // Verify the byte counter was rolled back: a smaller request fits.
298        let _g2 = controller.try_admit(&actor, 300).expect("smaller admit");
299    }
300
301    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
302    async fn actor_admission_race_does_not_exceed_cap() {
303        // Pin master plan §"WorkloadController" Finding 6: independent
304        // atomic load + check + add allows two concurrent callers to
305        // both pass a cap-N check. The Semaphore-based gate is
306        // race-free — exactly cap_count callers succeed.
307        //
308        // Each task holds its admission guard until released via a
309        // oneshot channel; this forces real contention because guards
310        // can't drop and free permits before all 32 calls have raced.
311        let controller = Arc::new(WorkloadController::new(16, u64::MAX / 4));
312        let actor: Arc<str> = "racer".into();
313
314        let (release_tx, _) = tokio::sync::broadcast::channel::<()>(1);
315
316        let mut handles = Vec::with_capacity(32);
317        for _ in 0..32 {
318            let controller = Arc::clone(&controller);
319            let actor = actor.clone();
320            let mut release_rx = release_tx.subscribe();
321            handles.push(tokio::spawn(async move {
322                let result = controller.try_admit(&actor, 1);
323                let success = result.is_ok();
324                // Hold the guard (if any) until the test signals release,
325                // so the cap-16 contention is observable across all 32
326                // tasks instead of permits being recycled task-by-task.
327                let _guard = result.ok();
328                let _ = release_rx.recv().await;
329                success
330            }));
331        }
332
333        // Give all 32 tasks a chance to hit `try_admit` before any can
334        // drop their guard. 50ms is plenty for tokio's scheduler on a
335        // 4-worker runtime.
336        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
337        // Release every task; collect succeed/reject counts.
338        let _ = release_tx.send(());
339
340        let mut accepted = 0u32;
341        let mut rejected = 0u32;
342        for h in handles {
343            if h.await.unwrap() {
344                accepted += 1;
345            } else {
346                rejected += 1;
347            }
348        }
349        assert_eq!(accepted, 16, "expected exactly 16 successful admits");
350        assert_eq!(rejected, 16, "expected exactly 16 rejections");
351    }
352
353    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
354    async fn per_actor_caps_independent() {
355        let controller = WorkloadController::new(1, 1024);
356        let alice: Arc<str> = "alice".into();
357        let bob: Arc<str> = "bob".into();
358        let _ga = controller.try_admit(&alice, 100).expect("alice ok");
359        // Alice over count cap, Bob unaffected.
360        let err = controller
361            .try_admit(&alice, 100)
362            .expect_err("alice rejected");
363        assert!(matches!(err, RejectReason::InFlightCountExceeded { .. }));
364        let _gb = controller.try_admit(&bob, 100).expect("bob ok");
365    }
366}