Skip to main content

astrid_events/route/
entry.rs

1//! `RouteEntry` state machine: per-route fan-out, DRR drain, and
2//! oldest-head eviction under the global byte budget.
3
4use std::collections::{HashMap, VecDeque};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::Instant;
8
9use tokio::sync::Notify;
10use uuid::Uuid;
11
12use crate::event::AstridEvent;
13use crate::route::matcher::{TopicMatcher, ipc_size_of, principal_class_label};
14
15/// Maximum bytes a single subscription will hold across all its
16/// per-principal sub-queues before publish-side head-eviction kicks in.
17/// Matches the per-call IPC payload ceiling so any single message still
18/// fits within one budget.
19pub const MAX_SUBSCRIPTION_BUDGET_BYTES: usize = 1024 * 1024;
20
21/// Per-round DRR floor. 5000 active principals × 4 `KiB` = 20 `MiB` of
22/// theoretical per-round throughput, well above the 1 `MiB` total budget,
23/// so a round always serves something for every principal in the
24/// rotation rather than starving the long tail.
25pub const DRR_QUANTUM_MIN_BYTES: usize = 4 * 1024;
26
27/// Defence-in-depth message-count cap per principal sub-queue. The byte
28/// budget is the primary admission control; this just stops a flood of
29/// 0-byte messages from monopolising one bucket.
30pub(crate) const PENDING_PER_PRINCIPAL_FALLBACK: usize = 256;
31
32/// Counter: head messages evicted from a per-principal sub-queue because
33/// the route's global byte budget would otherwise be exceeded. Labelled
34/// by `capsule` (the subscribing capsule) and bounded `principal_class`.
35pub const METRIC_ROUTE_BYTE_EVICTIONS_TOTAL: &str = "astrid_capsule_route_byte_evictions_total";
36
37/// Counter: rounds in which a principal's deficit-round-robin quantum
38/// could not cover its queue head message (sustained back pressure).
39/// Diagnostic — not a drop signal.
40pub const METRIC_ROUTE_QUANTUM_STARVED_TOTAL: &str = "astrid_capsule_route_quantum_starved_total";
41
42/// Composite identity of a single routed subscription on the bus.
43///
44/// Two guest capsules subscribing to the same pattern receive distinct
45/// `RouteKey`s — `subscription_rep` is unique per call — so messages
46/// fan out per subscriber rather than being shared like the broadcast
47/// channel.
48#[derive(Debug, Clone, PartialEq, Eq, Hash)]
49pub struct RouteKey {
50    /// Capsule UUID owning the subscription. Bounded by deployed
51    /// capsule count.
52    pub capsule_uuid: Uuid,
53    /// Topic pattern as supplied to `subscribe_topic_routed`.
54    pub topic_pattern: String,
55    /// Monotonic per-bus subscription id; distinguishes multiple
56    /// subscriptions of the same `(capsule_uuid, topic_pattern)` pair.
57    pub subscription_rep: u64,
58}
59
60/// Principal identity key for a fan-out bucket. `None` = system (kernel)
61/// principal; `Some(s)` = user/agent principal string.
62pub type PrincipalKey = Option<String>;
63
64/// Per-principal FIFO sub-queue inside a route.
65#[derive(Debug)]
66pub(crate) struct PrincipalQueue {
67    /// Queued events in publish order.
68    pub(crate) queue: VecDeque<Arc<AstridEvent>>,
69    /// Sum of `ipc_size_of(event)` for every event currently in `queue`.
70    pub(crate) bytes: usize,
71    /// Enqueue time of the current head, used for oldest-head eviction
72    /// under byte pressure. `None` ⇔ `queue.is_empty()`.
73    pub(crate) head_enqueued_at: Option<Instant>,
74    /// DRR deficit carried over rounds.
75    pub(crate) deficit: usize,
76}
77
78impl PrincipalQueue {
79    fn new() -> Self {
80        Self {
81            queue: VecDeque::new(),
82            bytes: 0,
83            head_enqueued_at: None,
84            deficit: 0,
85        }
86    }
87}
88
89/// Route-side fan-out entry. One per `(capsule, topic_pattern, subscription)`.
90#[derive(Debug)]
91pub(crate) struct RouteEntry {
92    /// Compiled topic matcher.
93    pub(crate) matcher: TopicMatcher,
94    /// Per-principal FIFO sub-queues. Demand-allocated: an idle principal
95    /// has zero entries.
96    pub(crate) fanout: HashMap<PrincipalKey, PrincipalQueue>,
97    /// FIFO order of distinct principal keys for DRR rotation.
98    pub(crate) principal_order: VecDeque<PrincipalKey>,
99    /// Sum of all per-principal `bytes`.
100    pub(crate) total_bytes: usize,
101    /// Stable capsule label for telemetry. Bounded by deployed capsule
102    /// count.
103    pub(crate) capsule_id_label: String,
104    /// Authz self-scope. `Some(p)` ⇒ only events whose publisher
105    /// [`PrincipalKey`] equals `p` are admitted; foreign-principal events
106    /// are dropped at enqueue ([`accepts`](Self::accepts) → `false`) so
107    /// they never consume this route's 1 `MiB` byte budget, never enter
108    /// the [`fanout`](Self::fanout) map, and can never head-evict the
109    /// owner's own entries. `None` ⇒ unscoped (all principals) — the
110    /// default and the only behaviour pre-existing subscriptions get.
111    ///
112    /// This is orthogonal to the per-principal DRR fan-out buckets, which
113    /// remain fairness-only (#813): scoping decides *admission*, DRR
114    /// decides *order* among whatever was admitted. A scoped route admits
115    /// at most one principal, so its DRR rotation holds exactly one
116    /// bucket — strictly less work than an unscoped firehose route.
117    ///
118    /// COMPLETENESS is CROSS-PRINCIPAL ONLY. The guarantee above ("can never
119    /// head-evict the owner's own entries") is about FOREIGN principals — a
120    /// noisy co-principal cannot evict the owner. A scoped route still applies
121    /// the 1 `MiB` byte budget and the 256-message per-principal cap to the
122    /// owner's OWN bucket, so if the owner publishes faster than the consumer
123    /// drains, the owner's OWN oldest entries can still self-evict (host
124    /// `tracing::error!`/metric only — no in-band guest signal). A
125    /// completeness-critical consumer (e.g. an audit-to-blockchain mint
126    /// pipeline) must therefore drain promptly or treat the persisted audit
127    /// log (`append_with_principal`) as the source of truth, not the live bus
128    /// feed. In-band drop-signalling / log reconciliation is a future
129    /// (Phase 2) concern.
130    pub(crate) scope: Option<PrincipalKey>,
131    /// Wakeup for `RoutedEventReceiver::recv`.
132    pub(crate) notify: Arc<Notify>,
133}
134
135impl RouteEntry {
136    /// Construct a new entry for the given matcher.
137    ///
138    /// `scope` self-scopes the route to a single publisher principal (see
139    /// [`scope`](Self::scope)); pass `None` for the unscoped, all-principals
140    /// behaviour that every pre-existing subscription relies on. The
141    /// argument is mandatory so every constructor must state its intent —
142    /// a forgotten scope is a compile error, the secure-by-default failure
143    /// mode.
144    pub(crate) fn new(
145        matcher: TopicMatcher,
146        capsule_id_label: String,
147        scope: Option<PrincipalKey>,
148    ) -> Self {
149        Self {
150            matcher,
151            fanout: HashMap::new(),
152            principal_order: VecDeque::new(),
153            total_bytes: 0,
154            capsule_id_label,
155            scope,
156            notify: Arc::new(Notify::new()),
157        }
158    }
159
160    /// Whether an event published by `publisher` is admitted into this
161    /// route. Unscoped routes (`scope == None`) admit every publisher;
162    /// scoped routes admit only their own principal. This is the single
163    /// named home of the authz rule — both the [`dispatch_to_routes`]
164    /// notify-skip and the [`push_with_eviction`](Self::push_with_eviction)
165    /// defence-in-depth guard consult it.
166    ///
167    /// [`dispatch_to_routes`]: crate::bus::EventBus
168    pub(crate) fn accepts(&self, publisher: &PrincipalKey) -> bool {
169        self.scope.as_ref().is_none_or(|s| s == publisher)
170    }
171
172    /// Push an event into the route, applying oldest-head eviction under
173    /// the global byte budget. Returns the number of evictions that
174    /// happened to make room.
175    pub(crate) fn push_with_eviction(
176        &mut self,
177        event: Arc<AstridEvent>,
178        principal: PrincipalKey,
179        budget_bytes: usize,
180    ) -> usize {
181        // Defence in depth: enforce the self-scope at the very TOP, before
182        // the oversize reject and the eviction loop, so a foreign-principal
183        // event never touches `total_bytes`, `fanout`, or `principal_order`
184        // even if a future caller forgets the `accepts()` gate in
185        // `dispatch_to_routes`. A scoped route's budget is therefore only
186        // ever consumable by its own principal.
187        if !self.accepts(&principal) {
188            return 0;
189        }
190
191        let msg_size = ipc_size_of(&event);
192
193        if msg_size > budget_bytes {
194            // Pathological: single message exceeds budget. Reject
195            // rather than evict everything.
196            let class = principal_class_label(principal.as_deref());
197            tracing::error!(
198                target: "astrid.audit.ipc",
199                security_event = true,
200                capsule = %self.capsule_id_label,
201                principal = principal.as_deref().unwrap_or("<none>"),
202                msg_size,
203                budget_bytes,
204                "ipc::route: incoming message exceeds global byte budget, rejecting publish",
205            );
206            metrics::counter!(
207                METRIC_ROUTE_BYTE_EVICTIONS_TOTAL,
208                "capsule" => self.capsule_id_label.clone(),
209                "principal_class" => class,
210            )
211            .increment(1);
212            return 1;
213        }
214
215        let mut evictions = 0usize;
216        while self.total_bytes.saturating_add(msg_size) > budget_bytes {
217            if !self.evict_oldest_head() {
218                // No queues to evict from but budget would still be
219                // exceeded — should not happen since msg_size ≤
220                // budget_bytes and total_bytes == 0 implies no queues.
221                break;
222            }
223            evictions = evictions.saturating_add(1);
224        }
225
226        let now = Instant::now();
227        let is_new = !self.fanout.contains_key(&principal);
228        let bucket = self
229            .fanout
230            .entry(principal.clone())
231            .or_insert_with(PrincipalQueue::new);
232
233        if bucket.queue.is_empty() {
234            bucket.head_enqueued_at = Some(now);
235        }
236        bucket.queue.push_back(event);
237        bucket.bytes = bucket.bytes.saturating_add(msg_size);
238        self.total_bytes = self.total_bytes.saturating_add(msg_size);
239
240        // Defence in depth: per-bucket message-count cap.
241        if bucket.queue.len() > PENDING_PER_PRINCIPAL_FALLBACK
242            && let Some(dropped) = bucket.queue.pop_front()
243        {
244            let dropped_size = ipc_size_of(&dropped);
245            bucket.bytes = bucket.bytes.saturating_sub(dropped_size);
246            self.total_bytes = self.total_bytes.saturating_sub(dropped_size);
247            bucket.head_enqueued_at = if bucket.queue.is_empty() {
248                None
249            } else {
250                Some(Instant::now())
251            };
252            let class = principal_class_label(principal.as_deref());
253            tracing::error!(
254                target: "astrid.audit.ipc",
255                security_event = true,
256                capsule = %self.capsule_id_label,
257                principal = principal.as_deref().unwrap_or("<none>"),
258                cap = PENDING_PER_PRINCIPAL_FALLBACK,
259                "ipc::route: per-principal queue cap reached, dropping oldest",
260            );
261            metrics::counter!(
262                METRIC_ROUTE_BYTE_EVICTIONS_TOTAL,
263                "capsule" => self.capsule_id_label.clone(),
264                "principal_class" => class,
265            )
266            .increment(1);
267        }
268
269        if is_new {
270            self.principal_order.push_back(principal);
271        }
272
273        evictions
274    }
275
276    /// Evict the head of the oldest-head queue. Returns true if an event
277    /// was evicted.
278    fn evict_oldest_head(&mut self) -> bool {
279        let Some(victim_key) = self.oldest_head_key() else {
280            return false;
281        };
282        let Some(bucket) = self.fanout.get_mut(&victim_key) else {
283            return false;
284        };
285        let Some(evicted) = bucket.queue.pop_front() else {
286            return false;
287        };
288        let evicted_size = ipc_size_of(&evicted);
289        bucket.bytes = bucket.bytes.saturating_sub(evicted_size);
290        self.total_bytes = self.total_bytes.saturating_sub(evicted_size);
291        bucket.head_enqueued_at = if bucket.queue.is_empty() {
292            None
293        } else {
294            // We don't track per-event push time, so on subsequent
295            // evictions the next head's age is approximated by "now".
296            // For correctness of the eviction order this is fine — we
297            // only care that the queue we just trimmed is no longer the
298            // oldest-head queue.
299            Some(Instant::now())
300        };
301
302        let evicted_topic = match &*evicted {
303            AstridEvent::Ipc { message, .. } => message.topic.clone(),
304            other => other.event_type().to_string(),
305        };
306        let class = principal_class_label(victim_key.as_deref());
307        tracing::error!(
308            target: "astrid.audit.ipc",
309            security_event = true,
310            capsule = %self.capsule_id_label,
311            principal = victim_key.as_deref().unwrap_or("<none>"),
312            evicted_topic = %evicted_topic,
313            total_bytes = self.total_bytes,
314            "ipc::route: global byte budget exhausted, dropping head of oldest queue",
315        );
316        metrics::counter!(
317            METRIC_ROUTE_BYTE_EVICTIONS_TOTAL,
318            "capsule" => self.capsule_id_label.clone(),
319            "principal_class" => class,
320        )
321        .increment(1);
322        true
323    }
324
325    /// Linear scan to find the bucket whose `head_enqueued_at` is minimum.
326    /// At 5000 principals on one route this is a hot spot under
327    /// sustained pressure; the follow-up `BTreeMap<Instant, PrincipalKey>`
328    /// head-age index lives in this same module if benchmarks show it
329    /// matters.
330    fn oldest_head_key(&self) -> Option<PrincipalKey> {
331        self.fanout
332            .iter()
333            .filter_map(|(k, q)| q.head_enqueued_at.map(|t| (t, k.clone())))
334            .min_by_key(|(t, _)| *t)
335            .map(|(_, k)| k)
336    }
337
338    /// Deficit-round-robin drain into `out` up to `budget`. Returns the
339    /// total bytes served. Empty queues are removed; partially served
340    /// queues remain at the back of the rotation.
341    pub(crate) fn drr_drain(&mut self, out: &mut Vec<Arc<AstridEvent>>, budget: usize) -> usize {
342        if self.fanout.is_empty() || budget == 0 {
343            return 0;
344        }
345
346        let mut served = 0usize;
347        let total = self.principal_order.len().max(1);
348        let quantum = std::cmp::max(
349            DRR_QUANTUM_MIN_BYTES,
350            budget.checked_div(total).unwrap_or(0),
351        );
352
353        loop {
354            let mut progress = false;
355            let visit = self.principal_order.len();
356            for _ in 0..visit {
357                let Some(key) = self.principal_order.pop_front() else {
358                    break;
359                };
360                let Some(bucket) = self.fanout.get_mut(&key) else {
361                    continue;
362                };
363                bucket.deficit = bucket.deficit.saturating_add(quantum);
364
365                let mut bucket_progress = false;
366                while let Some(front) = bucket.queue.front() {
367                    let sz = ipc_size_of(front);
368                    if sz > bucket.deficit || served.saturating_add(sz) > budget {
369                        break;
370                    }
371                    let msg = bucket.queue.pop_front().expect("front checked above");
372                    bucket.deficit = bucket.deficit.saturating_sub(sz);
373                    bucket.bytes = bucket.bytes.saturating_sub(sz);
374                    self.total_bytes = self.total_bytes.saturating_sub(sz);
375                    served = served.saturating_add(sz);
376                    out.push(msg);
377                    bucket_progress = true;
378                    // Refresh head age to the new head's enqueue time.
379                    // We don't track per-message enqueue times, so use
380                    // `now()` as a conservative approximation; it
381                    // affects only future eviction ordering, never
382                    // semantics.
383                    bucket.head_enqueued_at = if bucket.queue.is_empty() {
384                        None
385                    } else {
386                        Some(Instant::now())
387                    };
388                }
389                progress |= bucket_progress;
390
391                if !bucket_progress && !bucket.queue.is_empty() {
392                    // Could not cover head with this round's deficit.
393                    metrics::counter!(
394                        METRIC_ROUTE_QUANTUM_STARVED_TOTAL,
395                        "capsule" => self.capsule_id_label.clone(),
396                        "principal_class" => principal_class_label(key.as_deref()),
397                    )
398                    .increment(1);
399                }
400
401                if bucket.queue.is_empty() {
402                    self.fanout.remove(&key);
403                } else {
404                    self.principal_order.push_back(key);
405                }
406            }
407            if !progress || served >= budget {
408                break;
409            }
410        }
411
412        served
413    }
414
415    /// Number of distinct active principal buckets.
416    pub(crate) fn active_principals(&self) -> usize {
417        self.fanout.len()
418    }
419}
420
421/// Monotonic subscription-rep allocator shared across `EventBus` clones.
422#[derive(Debug, Default)]
423pub(crate) struct SubscriptionRepAllocator(pub(crate) AtomicU64);
424
425impl SubscriptionRepAllocator {
426    pub(crate) fn next(&self) -> u64 {
427        // Skip zero so it can sentinel "unallocated" if a debug path needs.
428        let v = self.0.fetch_add(1, Ordering::Relaxed);
429        v.saturating_add(1)
430    }
431}
432
433#[cfg(test)]
434mod tests {
435    use super::*;
436    use crate::event::EventMetadata;
437    use crate::ipc::{IpcMessage, IpcPayload};
438    use serde_json::json;
439    use uuid::Uuid;
440
441    fn ipc(topic: &str, principal: Option<&str>) -> Arc<AstridEvent> {
442        let mut msg = IpcMessage::new(topic, IpcPayload::RawJson(json!({})), Uuid::nil());
443        msg.principal = principal.map(String::from);
444        Arc::new(AstridEvent::Ipc {
445            metadata: EventMetadata::new("test"),
446            message: msg,
447        })
448    }
449
450    fn ipc_sized(topic: &str, principal: Option<&str>, payload_bytes: usize) -> Arc<AstridEvent> {
451        let blob = "x".repeat(payload_bytes);
452        let mut msg = IpcMessage::new(
453            topic,
454            IpcPayload::RawJson(json!({ "p": blob })),
455            Uuid::nil(),
456        );
457        msg.principal = principal.map(String::from);
458        Arc::new(AstridEvent::Ipc {
459            metadata: EventMetadata::new("test"),
460            message: msg,
461        })
462    }
463
464    #[test]
465    fn push_and_drain_single_principal() {
466        let mut entry = RouteEntry::new(TopicMatcher::new("t.*"), "capsule-a".into(), None);
467        for _ in 0..3 {
468            entry.push_with_eviction(
469                ipc("t.x", Some("alice")),
470                Some("alice".into()),
471                MAX_SUBSCRIPTION_BUDGET_BYTES,
472            );
473        }
474        let mut out = Vec::new();
475        entry.drr_drain(&mut out, MAX_SUBSCRIPTION_BUDGET_BYTES);
476        assert_eq!(out.len(), 3);
477        assert_eq!(entry.fanout.len(), 0);
478        assert_eq!(entry.total_bytes, 0);
479    }
480
481    #[test]
482    fn drr_two_principals_yield_equal_counts() {
483        // With the quantum floor of 4 KiB and tiny payloads, a single
484        // round drains every queue completely. Cross-principal
485        // fairness is therefore measured by equal *counts* delivered,
486        // not strict per-message interleaving — DRR semantics, not
487        // pure round-robin. Both principals should each see 2 events.
488        let mut entry = RouteEntry::new(TopicMatcher::new("t.*"), "capsule-a".into(), None);
489        for _ in 0..2 {
490            entry.push_with_eviction(
491                ipc("t.x", Some("alice")),
492                Some("alice".into()),
493                MAX_SUBSCRIPTION_BUDGET_BYTES,
494            );
495            entry.push_with_eviction(
496                ipc("t.x", Some("bob")),
497                Some("bob".into()),
498                MAX_SUBSCRIPTION_BUDGET_BYTES,
499            );
500        }
501        let mut out = Vec::new();
502        entry.drr_drain(&mut out, MAX_SUBSCRIPTION_BUDGET_BYTES);
503        assert_eq!(out.len(), 4);
504        let mut alice_count = 0;
505        let mut bob_count = 0;
506        for ev in &out {
507            if let AstridEvent::Ipc { message, .. } = &**ev {
508                match message.principal.as_deref() {
509                    Some("alice") => alice_count += 1,
510                    Some("bob") => bob_count += 1,
511                    _ => {},
512                }
513            }
514        }
515        assert_eq!(alice_count, 2);
516        assert_eq!(bob_count, 2);
517    }
518
519    #[test]
520    fn drr_interleaves_when_quantum_caps_per_round() {
521        // Tight budget = small per-principal quantum, so each round
522        // serves one message per principal before rotating. Inserted
523        // alice (large), bob (large) — drain visits alice, then bob,
524        // interleaved.
525        let payload_size = 8 * 1024; // 8 KiB > DRR_QUANTUM_MIN_BYTES/2
526        let budget = payload_size * 4 + 1024;
527        let mut entry = RouteEntry::new(TopicMatcher::new("t.*"), "capsule-a".into(), None);
528        entry.push_with_eviction(
529            ipc_sized("t.x", Some("alice"), payload_size),
530            Some("alice".into()),
531            budget,
532        );
533        entry.push_with_eviction(
534            ipc_sized("t.x", Some("bob"), payload_size),
535            Some("bob".into()),
536            budget,
537        );
538        entry.push_with_eviction(
539            ipc_sized("t.x", Some("alice"), payload_size),
540            Some("alice".into()),
541            budget,
542        );
543        entry.push_with_eviction(
544            ipc_sized("t.x", Some("bob"), payload_size),
545            Some("bob".into()),
546            budget,
547        );
548
549        let mut out = Vec::new();
550        // Drain with budget = payload_size * 4 so 4 messages can be served.
551        entry.drr_drain(&mut out, budget);
552        // Both principals must each see 2 messages — fairness held.
553        let mut alice_count = 0;
554        let mut bob_count = 0;
555        for ev in &out {
556            if let AstridEvent::Ipc { message, .. } = &**ev {
557                match message.principal.as_deref() {
558                    Some("alice") => alice_count += 1,
559                    Some("bob") => bob_count += 1,
560                    _ => {},
561                }
562            }
563        }
564        assert_eq!(alice_count, 2, "alice fairness");
565        assert_eq!(bob_count, 2, "bob fairness");
566    }
567
568    #[test]
569    fn drr_isolates_principals_under_burst() {
570        let mut entry = RouteEntry::new(TopicMatcher::new("t.*"), "capsule-a".into(), None);
571        for _ in 0..200 {
572            entry.push_with_eviction(
573                ipc("t.x", Some("alice")),
574                Some("alice".into()),
575                MAX_SUBSCRIPTION_BUDGET_BYTES,
576            );
577        }
578        // Only alice; bob has zero entries — demand-allocation invariant.
579        assert_eq!(entry.fanout.len(), 1);
580        assert!(entry.fanout.contains_key(&Some("alice".into())));
581    }
582
583    #[test]
584    fn eviction_drops_oldest_head_under_budget() {
585        // Budget tuned to ~3 large messages; pushing 4 forces eviction
586        // of the oldest head (alice's first), not bob's later message.
587        let payload_size = 64 * 1024;
588        let budget = payload_size * 3 + 4096;
589        let mut entry = RouteEntry::new(TopicMatcher::new("t.*"), "capsule-a".into(), None);
590
591        for _ in 0..3 {
592            entry.push_with_eviction(
593                ipc_sized("t.alice", Some("alice"), payload_size),
594                Some("alice".into()),
595                budget,
596            );
597        }
598        // Sanity: alice now holds 3 entries.
599        assert_eq!(
600            entry
601                .fanout
602                .get(&Some("alice".into()))
603                .map(|q| q.queue.len()),
604            Some(3)
605        );
606
607        // Push bob — fits without eviction.
608        entry.push_with_eviction(
609            ipc_sized("t.bob.terminator", Some("bob"), payload_size / 4),
610            Some("bob".into()),
611            budget,
612        );
613
614        // Force budget overflow by pushing a new large alice.
615        entry.push_with_eviction(
616            ipc_sized("t.alice.new", Some("alice"), payload_size),
617            Some("alice".into()),
618            budget,
619        );
620
621        // Alice's earliest head must have been evicted; bob's tail
622        // (the terminator) must still be present.
623        let alice_q = entry
624            .fanout
625            .get(&Some("alice".into()))
626            .expect("alice queue");
627        let bob_q = entry.fanout.get(&Some("bob".into())).expect("bob queue");
628        assert!(bob_q.queue.iter().any(|e| match &**e {
629            AstridEvent::Ipc { message, .. } => message.topic == "t.bob.terminator",
630            _ => false,
631        }));
632        // Alice should have evicted at least one of its earlier
633        // entries to make room.
634        assert!(
635            alice_q.queue.len() < 4,
636            "alice queue should have shed at least one head"
637        );
638    }
639
640    #[test]
641    fn pathological_message_alone_is_rejected() {
642        // budget = 1 KiB; message > 1 KiB → rejected, queue unchanged.
643        let small_budget = 1024;
644        let mut entry = RouteEntry::new(TopicMatcher::new("t.*"), "capsule-a".into(), None);
645        entry.push_with_eviction(
646            ipc_sized("t.alice", Some("alice"), 4096),
647            Some("alice".into()),
648            small_budget,
649        );
650        assert_eq!(entry.fanout.len(), 0);
651        assert_eq!(entry.total_bytes, 0);
652    }
653
654    #[test]
655    fn fairness_under_5000_principals_makes_progress() {
656        let mut entry = RouteEntry::new(TopicMatcher::new("t.*"), "capsule-a".into(), None);
657        for i in 0..5000 {
658            let p = format!("p{i}");
659            entry.push_with_eviction(ipc("t.x", Some(&p)), Some(p), MAX_SUBSCRIPTION_BUDGET_BYTES);
660        }
661        let mut out = Vec::new();
662        entry.drr_drain(&mut out, MAX_SUBSCRIPTION_BUDGET_BYTES);
663        // Every principal had exactly one tiny message; one round
664        // should drain all of them under the quantum floor.
665        assert_eq!(out.len(), 5000);
666        assert_eq!(entry.fanout.len(), 0);
667    }
668
669    // ── Self-scope (Option B route-level audit scoping) ──────────────
670
671    #[test]
672    fn accepts_predicate_authz_rule() {
673        // Scoped to alice: only alice's publisher key is admitted.
674        let scoped = RouteEntry::new(
675            TopicMatcher::new("t.*"),
676            "capsule-a".into(),
677            Some(Some("alice".into())),
678        );
679        assert!(scoped.accepts(&Some("alice".into())));
680        assert!(!scoped.accepts(&Some("bob".into())));
681        // The system/kernel (None) bucket is foreign to a user-scoped route.
682        assert!(!scoped.accepts(&None));
683
684        // Unscoped: every publisher (including system None) is admitted.
685        let unscoped = RouteEntry::new(TopicMatcher::new("t.*"), "capsule-a".into(), None);
686        assert!(unscoped.accepts(&Some("alice".into())));
687        assert!(unscoped.accepts(&Some("bob".into())));
688        assert!(unscoped.accepts(&None));
689    }
690
691    #[test]
692    fn scoped_drops_foreign_at_enqueue() {
693        // A route scoped to alice must drop bob's events at enqueue: bob's
694        // bytes never enter total_bytes, bob gets no fanout bucket, and a
695        // drain yields only alice.
696        let mut entry = RouteEntry::new(
697            TopicMatcher::new("t.*"),
698            "capsule-a".into(),
699            Some(Some("alice".into())),
700        );
701        for _ in 0..3 {
702            entry.push_with_eviction(
703                ipc("t.x", Some("alice")),
704                Some("alice".into()),
705                MAX_SUBSCRIPTION_BUDGET_BYTES,
706            );
707        }
708        for _ in 0..5 {
709            let evicted = entry.push_with_eviction(
710                ipc("t.x", Some("bob")),
711                Some("bob".into()),
712                MAX_SUBSCRIPTION_BUDGET_BYTES,
713            );
714            assert_eq!(evicted, 0, "foreign push is a no-op, never evicts");
715        }
716        // Only alice's bucket exists; bob's bytes never accrued.
717        assert_eq!(entry.fanout.len(), 1);
718        assert!(entry.fanout.contains_key(&Some("alice".into())));
719        assert!(!entry.fanout.contains_key(&Some("bob".into())));
720
721        let mut out = Vec::new();
722        entry.drr_drain(&mut out, MAX_SUBSCRIPTION_BUDGET_BYTES);
723        assert_eq!(out.len(), 3, "only alice's three events drain");
724        for ev in &out {
725            if let AstridEvent::Ipc { message, .. } = &**ev {
726                assert_eq!(message.principal.as_deref(), Some("alice"));
727            }
728        }
729    }
730
731    #[test]
732    fn scoped_budget_not_evictable_by_foreign_burst() {
733        // THE Option-B completeness guarantee: a foreign-principal burst far
734        // past the budget can NEVER evict the owner's entries, because the
735        // foreign bytes never enter the budget in the first place. A
736        // drain-time-filter design would FAIL this — bob's bytes would
737        // occupy the shared budget and head-evict alice before any filter.
738        let payload_size = 64 * 1024;
739        let budget = payload_size * 3 + 4096;
740        let mut entry = RouteEntry::new(
741            TopicMatcher::new("t.*"),
742            "capsule-a".into(),
743            Some(Some("alice".into())),
744        );
745        // Alice writes one entry well within budget.
746        entry.push_with_eviction(
747            ipc_sized("t.alice.keep", Some("alice"), payload_size),
748            Some("alice".into()),
749            budget,
750        );
751        // Bob floods far past the budget.
752        for _ in 0..100 {
753            entry.push_with_eviction(
754                ipc_sized("t.bob.flood", Some("bob"), payload_size),
755                Some("bob".into()),
756                budget,
757            );
758        }
759        // Alice's single entry is intact; bob never entered.
760        let alice_q = entry
761            .fanout
762            .get(&Some("alice".into()))
763            .expect("alice queue survives");
764        assert_eq!(alice_q.queue.len(), 1, "alice's entry never evicted");
765        assert!(!entry.fanout.contains_key(&Some("bob".into())));
766        assert_eq!(entry.total_bytes, alice_q.bytes);
767    }
768
769    #[test]
770    fn alloc_increments_monotonically() {
771        let a = SubscriptionRepAllocator::default();
772        let n1 = a.next();
773        let n2 = a.next();
774        assert_eq!(n2, n1.saturating_add(1));
775    }
776}