omnigraph_server/workload.rs
1//! Per-actor admission control for the HTTP server (MR-686 §VII.A).
2//!
3//! The HTTP server's previous global `RwLock<Omnigraph>` serialized every
4//! mutating request across all actors. PR 2 removes that lock — engine
5//! APIs are now `&self`, so concurrent calls from different actors can
6//! run against `Arc<Omnigraph>` simultaneously. Without admission
7//! control, one heavy actor can exhaust shared capacity (Lance I/O
8//! threads, manifest churn, network) and starve other actors.
9//!
10//! This module provides:
11//!
12//! - **Per-actor in-flight count cap**: each actor has a
13//! `tokio::sync::Semaphore` with `OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX`
14//! permits (default 16). `try_acquire_owned()` returns `Err` when
15//! exhausted; the server maps this to HTTP 429.
16//!
17//! - **Per-actor in-flight byte budget**: each actor accumulates an
18//! `AtomicU64` byte estimate. `fetch_add(est_bytes)` then a check
19//! against `byte_cap` is race-free via decrement-on-rejection. The
20//! server maps an over-budget result to HTTP 429 as well.
21//!
22//! Counts are governed by the semaphore (race-free `try_acquire_owned()`
23//! enforces the cap atomically); bytes use `fetch_add` + decrement-on-
24//! rejection. Both checks are atomic compare-and-act, never
25//! load-then-act — the test
26//! `actor_admission_race_does_not_exceed_cap` pins this contract by
27//! spawning 32 concurrent `try_admit` calls against a cap of 16 and
28//! asserting exactly 16 succeed.
29//!
30//! Acquisition order against the engine's per-(table, branch) write
31//! queue: admission FIRST (the HTTP handler reserves capacity before
32//! calling into the engine), engine queue SECOND (acquired inside
33//! `MutationStaging::commit_all`). This composes cleanly because
34//! admission is a single per-actor count + budget check, never
35//! cross-actor; nothing the engine does can change a peer actor's
36//! admission state.
37
38use std::sync::Arc;
39use std::sync::atomic::{AtomicU64, Ordering};
40
41use dashmap::DashMap;
42use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};
43
44/// Default per-actor in-flight count cap. Override via
45/// `OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX`.
46pub const DEFAULT_PER_ACTOR_INFLIGHT_MAX: u32 = 16;
47
48/// Default per-actor in-flight byte budget (4 GiB). Override via
49/// `OMNIGRAPH_PER_ACTOR_BYTES_MAX`.
50pub const DEFAULT_PER_ACTOR_BYTES_MAX: u64 = 4 * 1024 * 1024 * 1024;
51
52/// Why a `try_admit` call returned `Err`. The server maps each variant
53/// to a specific HTTP response code; see `WorkloadController` docs.
54#[derive(Debug, Clone, PartialEq, Eq)]
55pub enum RejectReason {
56 /// Actor exceeded the per-actor in-flight count cap. HTTP 429.
57 InFlightCountExceeded { cap: u32 },
58 /// Actor exceeded the per-actor in-flight byte budget. HTTP 429.
59 ByteBudgetExceeded { cap: u64, attempted: u64 },
60}
61
62impl std::fmt::Display for RejectReason {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 match self {
65 RejectReason::InFlightCountExceeded { cap } => {
66 write!(f, "actor in-flight count cap {} exceeded", cap)
67 }
68 RejectReason::ByteBudgetExceeded { cap, attempted } => write!(
69 f,
70 "actor byte budget exceeded: would use {} bytes against cap {}",
71 attempted, cap
72 ),
73 }
74 }
75}
76
77/// Per-actor counters. One instance per actor_id, lazily created on
78/// first admission attempt.
79#[derive(Debug)]
80pub(crate) struct ActorState {
81 /// Counts the number of concurrent in-flight requests for this
82 /// actor. `try_acquire_owned()` is the count-cap gate.
83 in_flight_sem: Arc<Semaphore>,
84 /// Total bytes estimated to be in flight for this actor across
85 /// concurrent requests. `fetch_add` + check + decrement-on-failure
86 /// keeps the cap atomic.
87 bytes: AtomicU64,
88 /// Per-actor byte cap (snapshot of `WorkloadController.byte_cap`
89 /// at construction; cap mutations don't propagate to existing
90 /// ActorStates by design — controller config changes apply on
91 /// next ActorState construction).
92 byte_cap: u64,
93 /// Per-actor count cap (same snapshot semantics as `byte_cap`).
94 inflight_cap: u32,
95}
96
97impl ActorState {
98 fn new(inflight_cap: u32, byte_cap: u64) -> Self {
99 Self {
100 in_flight_sem: Arc::new(Semaphore::new(inflight_cap as usize)),
101 bytes: AtomicU64::new(0),
102 byte_cap,
103 inflight_cap,
104 }
105 }
106}
107
108/// Server-side per-actor admission controller. Constructed once at
109/// server startup and shared via `Arc<WorkloadController>` on
110/// `AppState`.
111pub struct WorkloadController {
112 per_actor: DashMap<Arc<str>, Arc<ActorState>>,
113 inflight_cap: u32,
114 byte_cap: u64,
115}
116
117impl WorkloadController {
118 /// Construct from explicit caps. Tests can override.
119 pub fn new(inflight_cap: u32, byte_cap: u64) -> Self {
120 Self {
121 per_actor: DashMap::new(),
122 inflight_cap,
123 byte_cap,
124 }
125 }
126
127 /// Construct from environment variables, falling back to defaults.
128 /// Bad env values fall back to the default with a `tracing::warn!`.
129 pub fn from_env() -> Self {
130 let inflight_cap = parse_env_u32(
131 "OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX",
132 DEFAULT_PER_ACTOR_INFLIGHT_MAX,
133 );
134 let byte_cap = parse_env_u64("OMNIGRAPH_PER_ACTOR_BYTES_MAX", DEFAULT_PER_ACTOR_BYTES_MAX);
135 Self::new(inflight_cap, byte_cap)
136 }
137
138 /// Construct with default caps. Suitable for tests / single-tenant
139 /// deployments without explicit configuration.
140 pub fn with_defaults() -> Self {
141 Self::new(DEFAULT_PER_ACTOR_INFLIGHT_MAX, DEFAULT_PER_ACTOR_BYTES_MAX)
142 }
143
144 fn actor_state(&self, actor_id: &Arc<str>) -> Arc<ActorState> {
145 if let Some(existing) = self.per_actor.get(actor_id) {
146 return existing.clone();
147 }
148 // Race-on-construct is benign: DashMap's `entry().or_insert_with`
149 // serializes per-key construction; the loser's freshly-built
150 // ActorState gets dropped without observable effect.
151 self.per_actor
152 .entry(actor_id.clone())
153 .or_insert_with(|| Arc::new(ActorState::new(self.inflight_cap, self.byte_cap)))
154 .clone()
155 }
156
157 /// Reserve admission for one in-flight request from `actor_id`
158 /// estimated to consume `est_bytes`. Returns an `AdmissionGuard`
159 /// that releases the count permit + decrements the byte total
160 /// when dropped.
161 ///
162 /// On rejection, the byte counter is decremented before returning
163 /// — callers can retry without leaking budget.
164 pub fn try_admit(
165 &self,
166 actor_id: &Arc<str>,
167 est_bytes: u64,
168 ) -> Result<AdmissionGuard, RejectReason> {
169 let state = self.actor_state(actor_id);
170
171 // Count gate: race-free via `try_acquire_owned()`. If exhausted,
172 // immediately reject — no byte accounting needed for this request.
173 let permit = match Arc::clone(&state.in_flight_sem).try_acquire_owned() {
174 Ok(permit) => permit,
175 Err(TryAcquireError::NoPermits) => {
176 return Err(RejectReason::InFlightCountExceeded {
177 cap: state.inflight_cap,
178 });
179 }
180 Err(TryAcquireError::Closed) => {
181 return Err(RejectReason::InFlightCountExceeded {
182 cap: state.inflight_cap,
183 });
184 }
185 };
186
187 // Byte gate: atomic fetch_add then check; decrement on overflow.
188 // `Ordering::SeqCst` is conservative; per-actor accounting is
189 // not on the hot path of read queries.
190 let prior = state.bytes.fetch_add(est_bytes, Ordering::SeqCst);
191 let attempted = prior.saturating_add(est_bytes);
192 if attempted > state.byte_cap {
193 // Roll back the byte add. The permit drops with `permit`
194 // going out of scope below.
195 state.bytes.fetch_sub(est_bytes, Ordering::SeqCst);
196 return Err(RejectReason::ByteBudgetExceeded {
197 cap: state.byte_cap,
198 attempted,
199 });
200 }
201
202 Ok(AdmissionGuard {
203 _permit: permit,
204 actor_state: state,
205 est_bytes,
206 })
207 }
208}
209
210/// Drop-on-completion guard for an admitted request. Dropping releases
211/// the in-flight count permit (via `Drop` on the underlying semaphore
212/// permit) and decrements the actor's byte counter.
213#[derive(Debug)]
214pub struct AdmissionGuard {
215 _permit: OwnedSemaphorePermit,
216 actor_state: Arc<ActorState>,
217 est_bytes: u64,
218}
219
220impl Drop for AdmissionGuard {
221 fn drop(&mut self) {
222 self.actor_state
223 .bytes
224 .fetch_sub(self.est_bytes, Ordering::SeqCst);
225 }
226}
227
228fn parse_env_u32(name: &str, default: u32) -> u32 {
229 match std::env::var(name) {
230 Ok(v) => v.parse::<u32>().unwrap_or_else(|err| {
231 tracing::warn!(
232 env = name,
233 value = %v,
234 error = %err,
235 default,
236 "invalid env value, using default"
237 );
238 default
239 }),
240 Err(_) => default,
241 }
242}
243
244fn parse_env_u64(name: &str, default: u64) -> u64 {
245 match std::env::var(name) {
246 Ok(v) => v.parse::<u64>().unwrap_or_else(|err| {
247 tracing::warn!(
248 env = name,
249 value = %v,
250 error = %err,
251 default,
252 "invalid env value, using default"
253 );
254 default
255 }),
256 Err(_) => default,
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263
264 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
265 async fn try_admit_admits_under_cap() {
266 let controller = WorkloadController::new(2, 1024);
267 let actor: Arc<str> = "alice".into();
268 let g1 = controller.try_admit(&actor, 100).expect("first admit");
269 let _g2 = controller.try_admit(&actor, 100).expect("second admit");
270 let err = controller
271 .try_admit(&actor, 100)
272 .expect_err("third should reject on count");
273 assert!(matches!(
274 err,
275 RejectReason::InFlightCountExceeded { cap: 2 }
276 ));
277 drop(g1);
278 // After drop, a new admit succeeds again.
279 let _g3 = controller.try_admit(&actor, 100).expect("admit after drop");
280 }
281
282 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
283 async fn byte_budget_caps_admission() {
284 let controller = WorkloadController::new(16, 1000);
285 let actor: Arc<str> = "alice".into();
286 let _g1 = controller.try_admit(&actor, 600).expect("first admit");
287 let err = controller
288 .try_admit(&actor, 600)
289 .expect_err("second should reject on bytes");
290 match err {
291 RejectReason::ByteBudgetExceeded { cap, attempted } => {
292 assert_eq!(cap, 1000);
293 assert_eq!(attempted, 1200);
294 }
295 other => panic!("expected ByteBudgetExceeded, got {:?}", other),
296 }
297 // Verify the byte counter was rolled back: a smaller request fits.
298 let _g2 = controller.try_admit(&actor, 300).expect("smaller admit");
299 }
300
301 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
302 async fn actor_admission_race_does_not_exceed_cap() {
303 // Pin master plan §"WorkloadController" Finding 6: independent
304 // atomic load + check + add allows two concurrent callers to
305 // both pass a cap-N check. The Semaphore-based gate is
306 // race-free — exactly cap_count callers succeed.
307 //
308 // Each task holds its admission guard until released via a
309 // oneshot channel; this forces real contention because guards
310 // can't drop and free permits before all 32 calls have raced.
311 let controller = Arc::new(WorkloadController::new(16, u64::MAX / 4));
312 let actor: Arc<str> = "racer".into();
313
314 let (release_tx, _) = tokio::sync::broadcast::channel::<()>(1);
315
316 let mut handles = Vec::with_capacity(32);
317 for _ in 0..32 {
318 let controller = Arc::clone(&controller);
319 let actor = actor.clone();
320 let mut release_rx = release_tx.subscribe();
321 handles.push(tokio::spawn(async move {
322 let result = controller.try_admit(&actor, 1);
323 let success = result.is_ok();
324 // Hold the guard (if any) until the test signals release,
325 // so the cap-16 contention is observable across all 32
326 // tasks instead of permits being recycled task-by-task.
327 let _guard = result.ok();
328 let _ = release_rx.recv().await;
329 success
330 }));
331 }
332
333 // Give all 32 tasks a chance to hit `try_admit` before any can
334 // drop their guard. 50ms is plenty for tokio's scheduler on a
335 // 4-worker runtime.
336 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
337 // Release every task; collect succeed/reject counts.
338 let _ = release_tx.send(());
339
340 let mut accepted = 0u32;
341 let mut rejected = 0u32;
342 for h in handles {
343 if h.await.unwrap() {
344 accepted += 1;
345 } else {
346 rejected += 1;
347 }
348 }
349 assert_eq!(accepted, 16, "expected exactly 16 successful admits");
350 assert_eq!(rejected, 16, "expected exactly 16 rejections");
351 }
352
353 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
354 async fn per_actor_caps_independent() {
355 let controller = WorkloadController::new(1, 1024);
356 let alice: Arc<str> = "alice".into();
357 let bob: Arc<str> = "bob".into();
358 let _ga = controller.try_admit(&alice, 100).expect("alice ok");
359 // Alice over count cap, Bob unaffected.
360 let err = controller
361 .try_admit(&alice, 100)
362 .expect_err("alice rejected");
363 assert!(matches!(err, RejectReason::InFlightCountExceeded { .. }));
364 let _gb = controller.try_admit(&bob, 100).expect("bob ok");
365 }
366}