Skip to main content

astrid_capsule/
memory_ledger.rs

1//! Shared per-principal peak-memory accounting ledger + the per-Store limiter
2//! that feeds it.
3//!
4//! Records the high-water linear-memory size each invoking principal grows a
5//! Store to. Like [`FuelLedger`](crate::FuelLedger) the [`MemoryLedger`] is
6//! kernel-owned and cloned into every `WasmEngine`, so a principal's peak is the
7//! max across every capsule it drives — the substrate that fills
8//! `ResourceUsage::memory_bytes_peak_total`.
9//!
10//! **Attribution under pooling.** A capsule's pooled Stores are shared across
11//! principals under free checkout, and grown linear memory persists across
12//! leases (wasmtime cannot shrink a linear memory). The attributable signal is
13//! therefore "the largest memory any of this principal's invocations GREW a
14//! Store to": the principal that caused the growth owns the peak; one that
15//! reuses an already-grown Store without growing is not charged (memory growth
16//! is the only event the limiter sees). For a run-loop capsule's dedicated
17//! Store the attributee is the owner, set once.
18//!
19//! KNOWN IMPRECISION (telemetry-only, no deny path): growth records the
20//! ABSOLUTE new size, so a principal that grows an already-grown pooled Store —
21//! even by one page — is attributed that absolute high-water, which may include
22//! linear memory a *prior* leaseholder allocated. The peak is thus an upper
23//! bound on a principal's own footprint: never below its true peak, never above
24//! its `max_memory_bytes` ceiling, but possibly inflated by inherited pooled
25//! memory. Acceptable while this is operator-facing telemetry; revisit (e.g.
26//! per-lease baseline deltas) before it ever gates a budget decision.
27//!
28//! **Concurrency + growth.** Same shape as [`FuelLedger`]: a sharded
29//! [`DashMap`] of per-principal [`AtomicU64`], so concurrent invocations record
30//! lock-free per principal. One entry per distinct principal, capped at
31//! [`MAX_PRINCIPALS`] (the `astrid#827` lesson, since this map gains no deny
32//! path that would prune it): at capacity a new principal evicts the
33//! lowest-peak entry — but only if it is itself a bigger user — so a flood of
34//! ephemeral sub-agent principals cannot grow the map without limit, and the
35//! biggest memory users (the interesting telemetry) are the ones retained.
36
37use std::sync::Arc;
38use std::sync::atomic::{AtomicU64, Ordering};
39
40use astrid_core::PrincipalId;
41use dashmap::DashMap;
42
43/// Cap on distinct principals tracked. A flood of ephemeral sub-agent
44/// principals must not grow the ledger without bound — the lesson of the
45/// `chain_locks` / fuel-ledger churn (`astrid#827`). When full, recording a
46/// *new* principal first evicts the entry with the lowest recorded peak, so the
47/// ledger retains the biggest memory users (the interesting telemetry) instead
48/// of growing or dropping arbitrarily. Sized generously: real deployments have
49/// far fewer concurrent principals, so the cap only bites under adversarial
50/// ephemeral churn.
51const MAX_PRINCIPALS: usize = 4096;
52
53/// Shared, cloneable handle to the per-principal peak-memory ledger.
54///
55/// Cloning is an `Arc` bump; every clone observes the same map, so the kernel
56/// hands one clone to each `WasmEngine` and they all record into the same
57/// per-principal high-water marks.
58#[derive(Clone, Default)]
59pub struct MemoryLedger {
60    inner: Arc<DashMap<PrincipalId, AtomicU64>>,
61}
62
63impl MemoryLedger {
64    /// Raise `principal`'s recorded peak to `bytes` if it exceeds the current
65    /// high-water mark (else no-op).
66    ///
67    /// Lock-free in steady state: once a principal has an entry the common path
68    /// takes a shard *read* guard and a `Relaxed` compare-exchange max. Only the
69    /// first observation of a never-seen principal takes a shard write guard.
70    /// `Relaxed` is correct — a monotonic high-water mark with no ordering
71    /// dependency on other memory.
72    pub fn record_peak(&self, principal: &PrincipalId, bytes: u64) {
73        if bytes == 0 {
74            return;
75        }
76        if let Some(counter) = self.inner.get(principal) {
77            Self::raise_to(&counter, bytes);
78            return;
79        }
80        // New principal. Bound the map (`astrid#827` lesson): if at capacity,
81        // evict the lowest-peak entry — but ONLY if this newcomer's peak is
82        // strictly above it. Evicting a bigger user to record a smaller one
83        // would defeat the goal of keeping the biggest memory users (the
84        // interesting telemetry) and let a flood of small, ephemeral
85        // sub-agent principals thrash the real ones out. If the newcomer is
86        // not bigger, drop it. A benign race may let the size briefly exceed
87        // the cap under concurrent new inserts — bounded by the number of
88        // concurrent inserters, never unbounded.
89        if self.inner.len() >= MAX_PRINCIPALS && !self.evict_lowest_if_below(bytes) {
90            return;
91        }
92        Self::raise_to(&self.inner.entry(principal.clone()).or_default(), bytes);
93    }
94
95    /// At capacity, remove the entry with the smallest recorded peak — but only
96    /// when `threshold` exceeds it, so a smaller newcomer never displaces a
97    /// bigger user. Returns `true` if there is now room to insert (an entry was
98    /// evicted, or — racing — the map dipped empty), `false` if the newcomer
99    /// should be dropped.
100    ///
101    /// `O(n)` over the map, but only on the rare new-principal-while-at-capacity
102    /// path; `n` is bounded by [`MAX_PRINCIPALS`] and each probe is a `Relaxed`
103    /// load, so the scan is microseconds. The iterator's shard guards are
104    /// dropped before the `remove`, so it cannot deadlock against a concurrent
105    /// `record_peak`.
106    fn evict_lowest_if_below(&self, threshold: u64) -> bool {
107        let mut victim: Option<PrincipalId> = None;
108        let mut lowest = u64::MAX;
109        for entry in &*self.inner {
110            let peak = entry.value().load(Ordering::Relaxed);
111            if peak <= lowest {
112                lowest = peak;
113                victim = Some(entry.key().clone());
114            }
115        }
116        let Some(key) = victim else {
117            // Map raced empty — there is room.
118            return true;
119        };
120        if threshold <= lowest {
121            // The newcomer is no bigger than our smallest user; keep the
122            // bigger one and drop the newcomer.
123            return false;
124        }
125        self.inner.remove(&key);
126        true
127    }
128
129    /// Relaxed atomic max: raise `counter` to `bytes` if larger. The closure
130    /// returns `None` (no write) when `bytes` is not larger, so `fetch_update`
131    /// returns `Err` and we ignore it — lock-free and uncontended per principal.
132    fn raise_to(counter: &AtomicU64, bytes: u64) {
133        let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
134            (bytes > v).then_some(bytes)
135        });
136    }
137
138    /// Read `principal`'s peak linear-memory high-water mark in bytes, or `0` if
139    /// it has never grown a Store. A snapshot via a shard read guard + a single
140    /// `Relaxed` load, the same ordering the record path uses.
141    #[must_use]
142    pub fn peak(&self, principal: &PrincipalId) -> u64 {
143        self.inner
144            .get(principal)
145            .map_or(0, |counter| counter.load(Ordering::Relaxed))
146    }
147}
148
149/// Per-Store memory limiter: enforces the per-invocation byte ceiling **and**
150/// records the invoking principal's peak into the shared [`MemoryLedger`].
151///
152/// Replaces a plain `wasmtime::StoreLimits` as the `HostState` limiter field. A
153/// pooled Store is leased by different principals, so the ceiling and the
154/// attributee are re-targeted per invocation via [`set`](Self::set); for a
155/// run-loop's dedicated Store they are set once to the owner at build.
156pub struct StoreMemoryMeter {
157    /// Linear-memory byte ceiling for the current invocation (the principal's
158    /// `max_memory_bytes` quota). A grow beyond it is denied — the same cap the
159    /// old `StoreLimits::memory_size` enforced.
160    max_memory_bytes: usize,
161    /// Principal to attribute growth to (the invoking principal; the owner for a
162    /// run-loop's dedicated Store).
163    principal: PrincipalId,
164    /// Shared peak ledger.
165    ledger: MemoryLedger,
166}
167
168impl StoreMemoryMeter {
169    /// Build a meter capped at `max_memory_bytes`, attributing growth to
170    /// `principal`, recording into `ledger`.
171    #[must_use]
172    pub fn new(max_memory_bytes: usize, principal: PrincipalId, ledger: MemoryLedger) -> Self {
173        Self {
174            max_memory_bytes,
175            principal,
176            ledger,
177        }
178    }
179
180    /// Re-target for a new invocation: the principal's memory ceiling and the
181    /// principal to attribute peak growth to. Called at invocation SET, since a
182    /// pooled Store crosses principals.
183    pub fn set(&mut self, max_memory_bytes: usize, principal: PrincipalId) {
184        self.max_memory_bytes = max_memory_bytes;
185        self.principal = principal;
186    }
187}
188
189impl wasmtime::ResourceLimiter for StoreMemoryMeter {
190    fn memory_growing(
191        &mut self,
192        _current: usize,
193        desired: usize,
194        maximum: Option<usize>,
195    ) -> wasmtime::Result<bool> {
196        // Enforce the per-invocation byte ceiling (what `StoreLimits` did).
197        if desired > self.max_memory_bytes {
198            return Ok(false);
199        }
200        if let Some(max) = maximum
201            && desired > max
202        {
203            return Ok(false);
204        }
205        // Attribute the new high-water size to the invoking principal.
206        self.ledger
207            .record_peak(&self.principal, u64::try_from(desired).unwrap_or(u64::MAX));
208        Ok(true)
209    }
210
211    fn table_growing(
212        &mut self,
213        _current: usize,
214        _desired: usize,
215        _maximum: Option<usize>,
216    ) -> wasmtime::Result<bool> {
217        // Tables are unbounded here, matching the prior `StoreLimits` (which set
218        // only `memory_size`). Only linear memory is metered.
219        Ok(true)
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226
227    #[test]
228    fn record_peak_keeps_the_high_water_mark() {
229        let ledger = MemoryLedger::default();
230        let p = PrincipalId::default();
231        assert_eq!(ledger.peak(&p), 0);
232
233        ledger.record_peak(&p, 1000);
234        assert_eq!(ledger.peak(&p), 1000);
235
236        // A lower observation does not lower the peak.
237        ledger.record_peak(&p, 500);
238        assert_eq!(ledger.peak(&p), 1000);
239
240        // A higher one raises it.
241        ledger.record_peak(&p, 4096);
242        assert_eq!(ledger.peak(&p), 4096);
243
244        // Zero is ignored.
245        ledger.record_peak(&p, 0);
246        assert_eq!(ledger.peak(&p), 4096);
247    }
248
249    #[test]
250    fn ledger_is_per_principal_and_shared_across_clones() {
251        let ledger = MemoryLedger::default();
252        let a = PrincipalId::new("alice").unwrap();
253        let b = PrincipalId::new("bob").unwrap();
254
255        ledger.record_peak(&a, 2048);
256        // A clone observes the same map (shared Arc).
257        let clone = ledger.clone();
258        clone.record_peak(&b, 8192);
259
260        assert_eq!(ledger.peak(&a), 2048);
261        assert_eq!(ledger.peak(&b), 8192);
262        assert_eq!(clone.peak(&a), 2048);
263    }
264
265    #[test]
266    fn ledger_is_bounded_and_evicts_the_lowest_peak() {
267        let ledger = MemoryLedger::default();
268        // Fill to capacity; principal `pi` gets peak `i + 1`, so peaks are all
269        // distinct and the lowest is `p0` (peak 1).
270        for i in 0..MAX_PRINCIPALS {
271            let p = PrincipalId::new(format!("p{i}")).unwrap();
272            ledger.record_peak(&p, (i as u64) + 1);
273        }
274        assert_eq!(ledger.inner.len(), MAX_PRINCIPALS);
275        let lowest = PrincipalId::new("p0").unwrap();
276        assert_eq!(ledger.peak(&lowest), 1);
277
278        // One more NEW principal at a high peak evicts the lowest (`p0`) and the
279        // map stays bounded.
280        let newcomer = PrincipalId::new("newcomer").unwrap();
281        ledger.record_peak(&newcomer, 1_000_000);
282        assert!(ledger.inner.len() <= MAX_PRINCIPALS, "stays bounded");
283        assert_eq!(ledger.peak(&newcomer), 1_000_000, "newcomer recorded");
284        assert_eq!(ledger.peak(&lowest), 0, "lowest-peak principal evicted");
285
286        // A NEW principal whose peak is NOT above the current lowest must be
287        // DROPPED rather than evict a bigger user (else a flood of small
288        // ephemeral principals would thrash out the real ones). After the
289        // eviction above the smallest retained user is `p1` (peak 2); a
290        // newcomer at peak 2 (== the lowest) must not displace it.
291        let p1 = PrincipalId::new("p1").unwrap();
292        assert_eq!(ledger.peak(&p1), 2, "p1 is now the lowest retained user");
293        let smaller = PrincipalId::new("smaller").unwrap();
294        ledger.record_peak(&smaller, 2);
295        assert_eq!(
296            ledger.peak(&smaller),
297            0,
298            "smaller newcomer dropped, not recorded"
299        );
300        assert_eq!(ledger.peak(&p1), 2, "existing bigger user retained");
301    }
302
303    #[test]
304    fn meter_enforces_ceiling_and_records_peak() {
305        use wasmtime::ResourceLimiter;
306
307        let ledger = MemoryLedger::default();
308        let p = PrincipalId::new("carol").unwrap();
309        let mut meter = StoreMemoryMeter::new(64 * 1024, p.clone(), ledger.clone());
310
311        // Within the cap: allowed and recorded.
312        assert!(meter.memory_growing(0, 16 * 1024, None).unwrap());
313        assert_eq!(ledger.peak(&p), 16 * 1024);
314
315        // Growing further raises the peak.
316        assert!(meter.memory_growing(16 * 1024, 48 * 1024, None).unwrap());
317        assert_eq!(ledger.peak(&p), 48 * 1024);
318
319        // Beyond the ceiling: denied, peak unchanged.
320        assert!(!meter.memory_growing(48 * 1024, 128 * 1024, None).unwrap());
321        assert_eq!(ledger.peak(&p), 48 * 1024);
322
323        // Re-target to a new principal + cap; the old principal's peak persists.
324        let q = PrincipalId::new("dave").unwrap();
325        meter.set(256 * 1024, q.clone());
326        assert!(meter.memory_growing(0, 200 * 1024, None).unwrap());
327        assert_eq!(ledger.peak(&q), 200 * 1024);
328        assert_eq!(ledger.peak(&p), 48 * 1024);
329    }
330}