astrid_capsule/memory_ledger.rs
1//! Shared per-principal peak-memory accounting ledger + the per-Store limiter
2//! that feeds it.
3//!
4//! Records the high-water linear-memory size each invoking principal grows a
5//! Store to. Like [`FuelLedger`](crate::FuelLedger) the [`MemoryLedger`] is
6//! kernel-owned and cloned into every `WasmEngine`, so a principal's peak is the
7//! max across every capsule it drives — the substrate that fills
8//! `ResourceUsage::memory_bytes_peak_total`.
9//!
10//! **Attribution under pooling.** A capsule's pooled Stores are shared across
11//! principals under free checkout, and grown linear memory persists across
12//! leases (wasmtime cannot shrink a linear memory). The attributable signal is
13//! therefore "the largest memory any of this principal's invocations GREW a
14//! Store to": the principal that caused the growth owns the peak; one that
15//! reuses an already-grown Store without growing is not charged (memory growth
16//! is the only event the limiter sees). For a run-loop capsule's dedicated
17//! Store the attributee is the owner, set once.
18//!
19//! KNOWN IMPRECISION (telemetry-only, no deny path): growth records the
20//! ABSOLUTE new size, so a principal that grows an already-grown pooled Store —
21//! even by one page — is attributed that absolute high-water, which may include
22//! linear memory a *prior* leaseholder allocated. The peak is thus an upper
23//! bound on a principal's own footprint: never below its true peak, never above
24//! its `max_memory_bytes` ceiling, but possibly inflated by inherited pooled
25//! memory. Acceptable while this is operator-facing telemetry; revisit (e.g.
26//! per-lease baseline deltas) before it ever gates a budget decision.
27//!
28//! **Concurrency + growth.** Same shape as [`FuelLedger`]: a sharded
29//! [`DashMap`] of per-principal [`AtomicU64`], so concurrent invocations record
30//! lock-free per principal. One entry per distinct principal, capped at
31//! [`MAX_PRINCIPALS`] (the `astrid#827` lesson, since this map gains no deny
32//! path that would prune it): at capacity a new principal evicts the
33//! lowest-peak entry — but only if it is itself a bigger user — so a flood of
34//! ephemeral sub-agent principals cannot grow the map without limit, and the
35//! biggest memory users (the interesting telemetry) are the ones retained.
36
37use std::sync::Arc;
38use std::sync::atomic::{AtomicU64, Ordering};
39
40use astrid_core::PrincipalId;
41use dashmap::DashMap;
42
43/// Cap on distinct principals tracked. A flood of ephemeral sub-agent
44/// principals must not grow the ledger without bound — the lesson of the
45/// `chain_locks` / fuel-ledger churn (`astrid#827`). When full, recording a
46/// *new* principal first evicts the entry with the lowest recorded peak, so the
47/// ledger retains the biggest memory users (the interesting telemetry) instead
48/// of growing or dropping arbitrarily. Sized generously: real deployments have
49/// far fewer concurrent principals, so the cap only bites under adversarial
50/// ephemeral churn.
51const MAX_PRINCIPALS: usize = 4096;
52
53/// Shared, cloneable handle to the per-principal peak-memory ledger.
54///
55/// Cloning is an `Arc` bump; every clone observes the same map, so the kernel
56/// hands one clone to each `WasmEngine` and they all record into the same
57/// per-principal high-water marks.
58#[derive(Clone, Default)]
59pub struct MemoryLedger {
60 inner: Arc<DashMap<PrincipalId, AtomicU64>>,
61}
62
63impl MemoryLedger {
64 /// Raise `principal`'s recorded peak to `bytes` if it exceeds the current
65 /// high-water mark (else no-op).
66 ///
67 /// Lock-free in steady state: once a principal has an entry the common path
68 /// takes a shard *read* guard and a `Relaxed` compare-exchange max. Only the
69 /// first observation of a never-seen principal takes a shard write guard.
70 /// `Relaxed` is correct — a monotonic high-water mark with no ordering
71 /// dependency on other memory.
72 pub fn record_peak(&self, principal: &PrincipalId, bytes: u64) {
73 if bytes == 0 {
74 return;
75 }
76 if let Some(counter) = self.inner.get(principal) {
77 Self::raise_to(&counter, bytes);
78 return;
79 }
80 // New principal. Bound the map (`astrid#827` lesson): if at capacity,
81 // evict the lowest-peak entry — but ONLY if this newcomer's peak is
82 // strictly above it. Evicting a bigger user to record a smaller one
83 // would defeat the goal of keeping the biggest memory users (the
84 // interesting telemetry) and let a flood of small, ephemeral
85 // sub-agent principals thrash the real ones out. If the newcomer is
86 // not bigger, drop it. A benign race may let the size briefly exceed
87 // the cap under concurrent new inserts — bounded by the number of
88 // concurrent inserters, never unbounded.
89 if self.inner.len() >= MAX_PRINCIPALS && !self.evict_lowest_if_below(bytes) {
90 return;
91 }
92 Self::raise_to(&self.inner.entry(principal.clone()).or_default(), bytes);
93 }
94
95 /// At capacity, remove the entry with the smallest recorded peak — but only
96 /// when `threshold` exceeds it, so a smaller newcomer never displaces a
97 /// bigger user. Returns `true` if there is now room to insert (an entry was
98 /// evicted, or — racing — the map dipped empty), `false` if the newcomer
99 /// should be dropped.
100 ///
101 /// `O(n)` over the map, but only on the rare new-principal-while-at-capacity
102 /// path; `n` is bounded by [`MAX_PRINCIPALS`] and each probe is a `Relaxed`
103 /// load, so the scan is microseconds. The iterator's shard guards are
104 /// dropped before the `remove`, so it cannot deadlock against a concurrent
105 /// `record_peak`.
106 fn evict_lowest_if_below(&self, threshold: u64) -> bool {
107 let mut victim: Option<PrincipalId> = None;
108 let mut lowest = u64::MAX;
109 for entry in &*self.inner {
110 let peak = entry.value().load(Ordering::Relaxed);
111 if peak <= lowest {
112 lowest = peak;
113 victim = Some(entry.key().clone());
114 }
115 }
116 let Some(key) = victim else {
117 // Map raced empty — there is room.
118 return true;
119 };
120 if threshold <= lowest {
121 // The newcomer is no bigger than our smallest user; keep the
122 // bigger one and drop the newcomer.
123 return false;
124 }
125 self.inner.remove(&key);
126 true
127 }
128
129 /// Relaxed atomic max: raise `counter` to `bytes` if larger. The closure
130 /// returns `None` (no write) when `bytes` is not larger, so `fetch_update`
131 /// returns `Err` and we ignore it — lock-free and uncontended per principal.
132 fn raise_to(counter: &AtomicU64, bytes: u64) {
133 let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
134 (bytes > v).then_some(bytes)
135 });
136 }
137
138 /// Read `principal`'s peak linear-memory high-water mark in bytes, or `0` if
139 /// it has never grown a Store. A snapshot via a shard read guard + a single
140 /// `Relaxed` load, the same ordering the record path uses.
141 #[must_use]
142 pub fn peak(&self, principal: &PrincipalId) -> u64 {
143 self.inner
144 .get(principal)
145 .map_or(0, |counter| counter.load(Ordering::Relaxed))
146 }
147}
148
149/// Per-Store memory limiter: enforces the per-invocation byte ceiling **and**
150/// records the invoking principal's peak into the shared [`MemoryLedger`].
151///
152/// Replaces a plain `wasmtime::StoreLimits` as the `HostState` limiter field. A
153/// pooled Store is leased by different principals, so the ceiling and the
154/// attributee are re-targeted per invocation via [`set`](Self::set); for a
155/// run-loop's dedicated Store they are set once to the owner at build.
156pub struct StoreMemoryMeter {
157 /// Linear-memory byte ceiling for the current invocation (the principal's
158 /// `max_memory_bytes` quota). A grow beyond it is denied — the same cap the
159 /// old `StoreLimits::memory_size` enforced.
160 max_memory_bytes: usize,
161 /// Principal to attribute growth to (the invoking principal; the owner for a
162 /// run-loop's dedicated Store).
163 principal: PrincipalId,
164 /// Shared peak ledger.
165 ledger: MemoryLedger,
166}
167
168impl StoreMemoryMeter {
169 /// Build a meter capped at `max_memory_bytes`, attributing growth to
170 /// `principal`, recording into `ledger`.
171 #[must_use]
172 pub fn new(max_memory_bytes: usize, principal: PrincipalId, ledger: MemoryLedger) -> Self {
173 Self {
174 max_memory_bytes,
175 principal,
176 ledger,
177 }
178 }
179
180 /// Re-target for a new invocation: the principal's memory ceiling and the
181 /// principal to attribute peak growth to. Called at invocation SET, since a
182 /// pooled Store crosses principals.
183 pub fn set(&mut self, max_memory_bytes: usize, principal: PrincipalId) {
184 self.max_memory_bytes = max_memory_bytes;
185 self.principal = principal;
186 }
187}
188
189impl wasmtime::ResourceLimiter for StoreMemoryMeter {
190 fn memory_growing(
191 &mut self,
192 _current: usize,
193 desired: usize,
194 maximum: Option<usize>,
195 ) -> wasmtime::Result<bool> {
196 // Enforce the per-invocation byte ceiling (what `StoreLimits` did).
197 if desired > self.max_memory_bytes {
198 return Ok(false);
199 }
200 if let Some(max) = maximum
201 && desired > max
202 {
203 return Ok(false);
204 }
205 // Attribute the new high-water size to the invoking principal.
206 self.ledger
207 .record_peak(&self.principal, u64::try_from(desired).unwrap_or(u64::MAX));
208 Ok(true)
209 }
210
211 fn table_growing(
212 &mut self,
213 _current: usize,
214 _desired: usize,
215 _maximum: Option<usize>,
216 ) -> wasmtime::Result<bool> {
217 // Tables are unbounded here, matching the prior `StoreLimits` (which set
218 // only `memory_size`). Only linear memory is metered.
219 Ok(true)
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226
227 #[test]
228 fn record_peak_keeps_the_high_water_mark() {
229 let ledger = MemoryLedger::default();
230 let p = PrincipalId::default();
231 assert_eq!(ledger.peak(&p), 0);
232
233 ledger.record_peak(&p, 1000);
234 assert_eq!(ledger.peak(&p), 1000);
235
236 // A lower observation does not lower the peak.
237 ledger.record_peak(&p, 500);
238 assert_eq!(ledger.peak(&p), 1000);
239
240 // A higher one raises it.
241 ledger.record_peak(&p, 4096);
242 assert_eq!(ledger.peak(&p), 4096);
243
244 // Zero is ignored.
245 ledger.record_peak(&p, 0);
246 assert_eq!(ledger.peak(&p), 4096);
247 }
248
249 #[test]
250 fn ledger_is_per_principal_and_shared_across_clones() {
251 let ledger = MemoryLedger::default();
252 let a = PrincipalId::new("alice").unwrap();
253 let b = PrincipalId::new("bob").unwrap();
254
255 ledger.record_peak(&a, 2048);
256 // A clone observes the same map (shared Arc).
257 let clone = ledger.clone();
258 clone.record_peak(&b, 8192);
259
260 assert_eq!(ledger.peak(&a), 2048);
261 assert_eq!(ledger.peak(&b), 8192);
262 assert_eq!(clone.peak(&a), 2048);
263 }
264
265 #[test]
266 fn ledger_is_bounded_and_evicts_the_lowest_peak() {
267 let ledger = MemoryLedger::default();
268 // Fill to capacity; principal `pi` gets peak `i + 1`, so peaks are all
269 // distinct and the lowest is `p0` (peak 1).
270 for i in 0..MAX_PRINCIPALS {
271 let p = PrincipalId::new(format!("p{i}")).unwrap();
272 ledger.record_peak(&p, (i as u64) + 1);
273 }
274 assert_eq!(ledger.inner.len(), MAX_PRINCIPALS);
275 let lowest = PrincipalId::new("p0").unwrap();
276 assert_eq!(ledger.peak(&lowest), 1);
277
278 // One more NEW principal at a high peak evicts the lowest (`p0`) and the
279 // map stays bounded.
280 let newcomer = PrincipalId::new("newcomer").unwrap();
281 ledger.record_peak(&newcomer, 1_000_000);
282 assert!(ledger.inner.len() <= MAX_PRINCIPALS, "stays bounded");
283 assert_eq!(ledger.peak(&newcomer), 1_000_000, "newcomer recorded");
284 assert_eq!(ledger.peak(&lowest), 0, "lowest-peak principal evicted");
285
286 // A NEW principal whose peak is NOT above the current lowest must be
287 // DROPPED rather than evict a bigger user (else a flood of small
288 // ephemeral principals would thrash out the real ones). After the
289 // eviction above the smallest retained user is `p1` (peak 2); a
290 // newcomer at peak 2 (== the lowest) must not displace it.
291 let p1 = PrincipalId::new("p1").unwrap();
292 assert_eq!(ledger.peak(&p1), 2, "p1 is now the lowest retained user");
293 let smaller = PrincipalId::new("smaller").unwrap();
294 ledger.record_peak(&smaller, 2);
295 assert_eq!(
296 ledger.peak(&smaller),
297 0,
298 "smaller newcomer dropped, not recorded"
299 );
300 assert_eq!(ledger.peak(&p1), 2, "existing bigger user retained");
301 }
302
303 #[test]
304 fn meter_enforces_ceiling_and_records_peak() {
305 use wasmtime::ResourceLimiter;
306
307 let ledger = MemoryLedger::default();
308 let p = PrincipalId::new("carol").unwrap();
309 let mut meter = StoreMemoryMeter::new(64 * 1024, p.clone(), ledger.clone());
310
311 // Within the cap: allowed and recorded.
312 assert!(meter.memory_growing(0, 16 * 1024, None).unwrap());
313 assert_eq!(ledger.peak(&p), 16 * 1024);
314
315 // Growing further raises the peak.
316 assert!(meter.memory_growing(16 * 1024, 48 * 1024, None).unwrap());
317 assert_eq!(ledger.peak(&p), 48 * 1024);
318
319 // Beyond the ceiling: denied, peak unchanged.
320 assert!(!meter.memory_growing(48 * 1024, 128 * 1024, None).unwrap());
321 assert_eq!(ledger.peak(&p), 48 * 1024);
322
323 // Re-target to a new principal + cap; the old principal's peak persists.
324 let q = PrincipalId::new("dave").unwrap();
325 meter.set(256 * 1024, q.clone());
326 assert!(meter.memory_growing(0, 200 * 1024, None).unwrap());
327 assert_eq!(ledger.peak(&q), 200 * 1024);
328 assert_eq!(ledger.peak(&p), 48 * 1024);
329 }
330}