1use std::collections::{HashMap, VecDeque};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::Instant;
8
9use tokio::sync::Notify;
10use uuid::Uuid;
11
12use crate::event::AstridEvent;
13use crate::route::matcher::{TopicMatcher, ipc_size_of, principal_class_label};
14
15pub const MAX_SUBSCRIPTION_BUDGET_BYTES: usize = 1024 * 1024;
20
21pub const DRR_QUANTUM_MIN_BYTES: usize = 4 * 1024;
26
27pub(crate) const PENDING_PER_PRINCIPAL_FALLBACK: usize = 256;
31
32pub const METRIC_ROUTE_BYTE_EVICTIONS_TOTAL: &str = "astrid_capsule_route_byte_evictions_total";
36
37pub const METRIC_ROUTE_QUANTUM_STARVED_TOTAL: &str = "astrid_capsule_route_quantum_starved_total";
41
42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
49pub struct RouteKey {
50 pub capsule_uuid: Uuid,
53 pub topic_pattern: String,
55 pub subscription_rep: u64,
58}
59
60pub type PrincipalKey = Option<String>;
63
64#[derive(Debug)]
66pub(crate) struct PrincipalQueue {
67 pub(crate) queue: VecDeque<Arc<AstridEvent>>,
69 pub(crate) bytes: usize,
71 pub(crate) head_enqueued_at: Option<Instant>,
74 pub(crate) deficit: usize,
76}
77
78impl PrincipalQueue {
79 fn new() -> Self {
80 Self {
81 queue: VecDeque::new(),
82 bytes: 0,
83 head_enqueued_at: None,
84 deficit: 0,
85 }
86 }
87}
88
89#[derive(Debug)]
91pub(crate) struct RouteEntry {
92 pub(crate) matcher: TopicMatcher,
94 pub(crate) fanout: HashMap<PrincipalKey, PrincipalQueue>,
97 pub(crate) principal_order: VecDeque<PrincipalKey>,
99 pub(crate) total_bytes: usize,
101 pub(crate) capsule_id_label: String,
104 pub(crate) scope: Option<PrincipalKey>,
131 pub(crate) notify: Arc<Notify>,
133}
134
135impl RouteEntry {
136 pub(crate) fn new(
145 matcher: TopicMatcher,
146 capsule_id_label: String,
147 scope: Option<PrincipalKey>,
148 ) -> Self {
149 Self {
150 matcher,
151 fanout: HashMap::new(),
152 principal_order: VecDeque::new(),
153 total_bytes: 0,
154 capsule_id_label,
155 scope,
156 notify: Arc::new(Notify::new()),
157 }
158 }
159
160 pub(crate) fn accepts(&self, publisher: &PrincipalKey) -> bool {
169 self.scope.as_ref().is_none_or(|s| s == publisher)
170 }
171
172 pub(crate) fn push_with_eviction(
176 &mut self,
177 event: Arc<AstridEvent>,
178 principal: PrincipalKey,
179 budget_bytes: usize,
180 ) -> usize {
181 if !self.accepts(&principal) {
188 return 0;
189 }
190
191 let msg_size = ipc_size_of(&event);
192
193 if msg_size > budget_bytes {
194 let class = principal_class_label(principal.as_deref());
197 tracing::error!(
198 target: "astrid.audit.ipc",
199 security_event = true,
200 capsule = %self.capsule_id_label,
201 principal = principal.as_deref().unwrap_or("<none>"),
202 msg_size,
203 budget_bytes,
204 "ipc::route: incoming message exceeds global byte budget, rejecting publish",
205 );
206 metrics::counter!(
207 METRIC_ROUTE_BYTE_EVICTIONS_TOTAL,
208 "capsule" => self.capsule_id_label.clone(),
209 "principal_class" => class,
210 )
211 .increment(1);
212 return 1;
213 }
214
215 let mut evictions = 0usize;
216 while self.total_bytes.saturating_add(msg_size) > budget_bytes {
217 if !self.evict_oldest_head() {
218 break;
222 }
223 evictions = evictions.saturating_add(1);
224 }
225
226 let now = Instant::now();
227 let is_new = !self.fanout.contains_key(&principal);
228 let bucket = self
229 .fanout
230 .entry(principal.clone())
231 .or_insert_with(PrincipalQueue::new);
232
233 if bucket.queue.is_empty() {
234 bucket.head_enqueued_at = Some(now);
235 }
236 bucket.queue.push_back(event);
237 bucket.bytes = bucket.bytes.saturating_add(msg_size);
238 self.total_bytes = self.total_bytes.saturating_add(msg_size);
239
240 if bucket.queue.len() > PENDING_PER_PRINCIPAL_FALLBACK
242 && let Some(dropped) = bucket.queue.pop_front()
243 {
244 let dropped_size = ipc_size_of(&dropped);
245 bucket.bytes = bucket.bytes.saturating_sub(dropped_size);
246 self.total_bytes = self.total_bytes.saturating_sub(dropped_size);
247 bucket.head_enqueued_at = if bucket.queue.is_empty() {
248 None
249 } else {
250 Some(Instant::now())
251 };
252 let class = principal_class_label(principal.as_deref());
253 tracing::error!(
254 target: "astrid.audit.ipc",
255 security_event = true,
256 capsule = %self.capsule_id_label,
257 principal = principal.as_deref().unwrap_or("<none>"),
258 cap = PENDING_PER_PRINCIPAL_FALLBACK,
259 "ipc::route: per-principal queue cap reached, dropping oldest",
260 );
261 metrics::counter!(
262 METRIC_ROUTE_BYTE_EVICTIONS_TOTAL,
263 "capsule" => self.capsule_id_label.clone(),
264 "principal_class" => class,
265 )
266 .increment(1);
267 }
268
269 if is_new {
270 self.principal_order.push_back(principal);
271 }
272
273 evictions
274 }
275
276 fn evict_oldest_head(&mut self) -> bool {
279 let Some(victim_key) = self.oldest_head_key() else {
280 return false;
281 };
282 let Some(bucket) = self.fanout.get_mut(&victim_key) else {
283 return false;
284 };
285 let Some(evicted) = bucket.queue.pop_front() else {
286 return false;
287 };
288 let evicted_size = ipc_size_of(&evicted);
289 bucket.bytes = bucket.bytes.saturating_sub(evicted_size);
290 self.total_bytes = self.total_bytes.saturating_sub(evicted_size);
291 bucket.head_enqueued_at = if bucket.queue.is_empty() {
292 None
293 } else {
294 Some(Instant::now())
300 };
301
302 let evicted_topic = match &*evicted {
303 AstridEvent::Ipc { message, .. } => message.topic.clone(),
304 other => other.event_type().to_string(),
305 };
306 let class = principal_class_label(victim_key.as_deref());
307 tracing::error!(
308 target: "astrid.audit.ipc",
309 security_event = true,
310 capsule = %self.capsule_id_label,
311 principal = victim_key.as_deref().unwrap_or("<none>"),
312 evicted_topic = %evicted_topic,
313 total_bytes = self.total_bytes,
314 "ipc::route: global byte budget exhausted, dropping head of oldest queue",
315 );
316 metrics::counter!(
317 METRIC_ROUTE_BYTE_EVICTIONS_TOTAL,
318 "capsule" => self.capsule_id_label.clone(),
319 "principal_class" => class,
320 )
321 .increment(1);
322 true
323 }
324
325 fn oldest_head_key(&self) -> Option<PrincipalKey> {
331 self.fanout
332 .iter()
333 .filter_map(|(k, q)| q.head_enqueued_at.map(|t| (t, k.clone())))
334 .min_by_key(|(t, _)| *t)
335 .map(|(_, k)| k)
336 }
337
338 pub(crate) fn drr_drain(&mut self, out: &mut Vec<Arc<AstridEvent>>, budget: usize) -> usize {
342 if self.fanout.is_empty() || budget == 0 {
343 return 0;
344 }
345
346 let mut served = 0usize;
347 let total = self.principal_order.len().max(1);
348 let quantum = std::cmp::max(
349 DRR_QUANTUM_MIN_BYTES,
350 budget.checked_div(total).unwrap_or(0),
351 );
352
353 loop {
354 let mut progress = false;
355 let visit = self.principal_order.len();
356 for _ in 0..visit {
357 let Some(key) = self.principal_order.pop_front() else {
358 break;
359 };
360 let Some(bucket) = self.fanout.get_mut(&key) else {
361 continue;
362 };
363 bucket.deficit = bucket.deficit.saturating_add(quantum);
364
365 let mut bucket_progress = false;
366 while let Some(front) = bucket.queue.front() {
367 let sz = ipc_size_of(front);
368 if sz > bucket.deficit || served.saturating_add(sz) > budget {
369 break;
370 }
371 let msg = bucket.queue.pop_front().expect("front checked above");
372 bucket.deficit = bucket.deficit.saturating_sub(sz);
373 bucket.bytes = bucket.bytes.saturating_sub(sz);
374 self.total_bytes = self.total_bytes.saturating_sub(sz);
375 served = served.saturating_add(sz);
376 out.push(msg);
377 bucket_progress = true;
378 bucket.head_enqueued_at = if bucket.queue.is_empty() {
384 None
385 } else {
386 Some(Instant::now())
387 };
388 }
389 progress |= bucket_progress;
390
391 if !bucket_progress && !bucket.queue.is_empty() {
392 metrics::counter!(
394 METRIC_ROUTE_QUANTUM_STARVED_TOTAL,
395 "capsule" => self.capsule_id_label.clone(),
396 "principal_class" => principal_class_label(key.as_deref()),
397 )
398 .increment(1);
399 }
400
401 if bucket.queue.is_empty() {
402 self.fanout.remove(&key);
403 } else {
404 self.principal_order.push_back(key);
405 }
406 }
407 if !progress || served >= budget {
408 break;
409 }
410 }
411
412 served
413 }
414
415 pub(crate) fn active_principals(&self) -> usize {
417 self.fanout.len()
418 }
419}
420
421#[derive(Debug, Default)]
423pub(crate) struct SubscriptionRepAllocator(pub(crate) AtomicU64);
424
425impl SubscriptionRepAllocator {
426 pub(crate) fn next(&self) -> u64 {
427 let v = self.0.fetch_add(1, Ordering::Relaxed);
429 v.saturating_add(1)
430 }
431}
432
433#[cfg(test)]
434mod tests {
435 use super::*;
436 use crate::event::EventMetadata;
437 use crate::ipc::{IpcMessage, IpcPayload};
438 use serde_json::json;
439 use uuid::Uuid;
440
441 fn ipc(topic: &str, principal: Option<&str>) -> Arc<AstridEvent> {
442 let mut msg = IpcMessage::new(topic, IpcPayload::RawJson(json!({})), Uuid::nil());
443 msg.principal = principal.map(String::from);
444 Arc::new(AstridEvent::Ipc {
445 metadata: EventMetadata::new("test"),
446 message: msg,
447 })
448 }
449
450 fn ipc_sized(topic: &str, principal: Option<&str>, payload_bytes: usize) -> Arc<AstridEvent> {
451 let blob = "x".repeat(payload_bytes);
452 let mut msg = IpcMessage::new(
453 topic,
454 IpcPayload::RawJson(json!({ "p": blob })),
455 Uuid::nil(),
456 );
457 msg.principal = principal.map(String::from);
458 Arc::new(AstridEvent::Ipc {
459 metadata: EventMetadata::new("test"),
460 message: msg,
461 })
462 }
463
464 #[test]
465 fn push_and_drain_single_principal() {
466 let mut entry = RouteEntry::new(TopicMatcher::new("t.*"), "capsule-a".into(), None);
467 for _ in 0..3 {
468 entry.push_with_eviction(
469 ipc("t.x", Some("alice")),
470 Some("alice".into()),
471 MAX_SUBSCRIPTION_BUDGET_BYTES,
472 );
473 }
474 let mut out = Vec::new();
475 entry.drr_drain(&mut out, MAX_SUBSCRIPTION_BUDGET_BYTES);
476 assert_eq!(out.len(), 3);
477 assert_eq!(entry.fanout.len(), 0);
478 assert_eq!(entry.total_bytes, 0);
479 }
480
481 #[test]
482 fn drr_two_principals_yield_equal_counts() {
483 let mut entry = RouteEntry::new(TopicMatcher::new("t.*"), "capsule-a".into(), None);
489 for _ in 0..2 {
490 entry.push_with_eviction(
491 ipc("t.x", Some("alice")),
492 Some("alice".into()),
493 MAX_SUBSCRIPTION_BUDGET_BYTES,
494 );
495 entry.push_with_eviction(
496 ipc("t.x", Some("bob")),
497 Some("bob".into()),
498 MAX_SUBSCRIPTION_BUDGET_BYTES,
499 );
500 }
501 let mut out = Vec::new();
502 entry.drr_drain(&mut out, MAX_SUBSCRIPTION_BUDGET_BYTES);
503 assert_eq!(out.len(), 4);
504 let mut alice_count = 0;
505 let mut bob_count = 0;
506 for ev in &out {
507 if let AstridEvent::Ipc { message, .. } = &**ev {
508 match message.principal.as_deref() {
509 Some("alice") => alice_count += 1,
510 Some("bob") => bob_count += 1,
511 _ => {},
512 }
513 }
514 }
515 assert_eq!(alice_count, 2);
516 assert_eq!(bob_count, 2);
517 }
518
519 #[test]
520 fn drr_interleaves_when_quantum_caps_per_round() {
521 let payload_size = 8 * 1024; let budget = payload_size * 4 + 1024;
527 let mut entry = RouteEntry::new(TopicMatcher::new("t.*"), "capsule-a".into(), None);
528 entry.push_with_eviction(
529 ipc_sized("t.x", Some("alice"), payload_size),
530 Some("alice".into()),
531 budget,
532 );
533 entry.push_with_eviction(
534 ipc_sized("t.x", Some("bob"), payload_size),
535 Some("bob".into()),
536 budget,
537 );
538 entry.push_with_eviction(
539 ipc_sized("t.x", Some("alice"), payload_size),
540 Some("alice".into()),
541 budget,
542 );
543 entry.push_with_eviction(
544 ipc_sized("t.x", Some("bob"), payload_size),
545 Some("bob".into()),
546 budget,
547 );
548
549 let mut out = Vec::new();
550 entry.drr_drain(&mut out, budget);
552 let mut alice_count = 0;
554 let mut bob_count = 0;
555 for ev in &out {
556 if let AstridEvent::Ipc { message, .. } = &**ev {
557 match message.principal.as_deref() {
558 Some("alice") => alice_count += 1,
559 Some("bob") => bob_count += 1,
560 _ => {},
561 }
562 }
563 }
564 assert_eq!(alice_count, 2, "alice fairness");
565 assert_eq!(bob_count, 2, "bob fairness");
566 }
567
568 #[test]
569 fn drr_isolates_principals_under_burst() {
570 let mut entry = RouteEntry::new(TopicMatcher::new("t.*"), "capsule-a".into(), None);
571 for _ in 0..200 {
572 entry.push_with_eviction(
573 ipc("t.x", Some("alice")),
574 Some("alice".into()),
575 MAX_SUBSCRIPTION_BUDGET_BYTES,
576 );
577 }
578 assert_eq!(entry.fanout.len(), 1);
580 assert!(entry.fanout.contains_key(&Some("alice".into())));
581 }
582
583 #[test]
584 fn eviction_drops_oldest_head_under_budget() {
585 let payload_size = 64 * 1024;
588 let budget = payload_size * 3 + 4096;
589 let mut entry = RouteEntry::new(TopicMatcher::new("t.*"), "capsule-a".into(), None);
590
591 for _ in 0..3 {
592 entry.push_with_eviction(
593 ipc_sized("t.alice", Some("alice"), payload_size),
594 Some("alice".into()),
595 budget,
596 );
597 }
598 assert_eq!(
600 entry
601 .fanout
602 .get(&Some("alice".into()))
603 .map(|q| q.queue.len()),
604 Some(3)
605 );
606
607 entry.push_with_eviction(
609 ipc_sized("t.bob.terminator", Some("bob"), payload_size / 4),
610 Some("bob".into()),
611 budget,
612 );
613
614 entry.push_with_eviction(
616 ipc_sized("t.alice.new", Some("alice"), payload_size),
617 Some("alice".into()),
618 budget,
619 );
620
621 let alice_q = entry
624 .fanout
625 .get(&Some("alice".into()))
626 .expect("alice queue");
627 let bob_q = entry.fanout.get(&Some("bob".into())).expect("bob queue");
628 assert!(bob_q.queue.iter().any(|e| match &**e {
629 AstridEvent::Ipc { message, .. } => message.topic == "t.bob.terminator",
630 _ => false,
631 }));
632 assert!(
635 alice_q.queue.len() < 4,
636 "alice queue should have shed at least one head"
637 );
638 }
639
640 #[test]
641 fn pathological_message_alone_is_rejected() {
642 let small_budget = 1024;
644 let mut entry = RouteEntry::new(TopicMatcher::new("t.*"), "capsule-a".into(), None);
645 entry.push_with_eviction(
646 ipc_sized("t.alice", Some("alice"), 4096),
647 Some("alice".into()),
648 small_budget,
649 );
650 assert_eq!(entry.fanout.len(), 0);
651 assert_eq!(entry.total_bytes, 0);
652 }
653
654 #[test]
655 fn fairness_under_5000_principals_makes_progress() {
656 let mut entry = RouteEntry::new(TopicMatcher::new("t.*"), "capsule-a".into(), None);
657 for i in 0..5000 {
658 let p = format!("p{i}");
659 entry.push_with_eviction(ipc("t.x", Some(&p)), Some(p), MAX_SUBSCRIPTION_BUDGET_BYTES);
660 }
661 let mut out = Vec::new();
662 entry.drr_drain(&mut out, MAX_SUBSCRIPTION_BUDGET_BYTES);
663 assert_eq!(out.len(), 5000);
666 assert_eq!(entry.fanout.len(), 0);
667 }
668
669 #[test]
672 fn accepts_predicate_authz_rule() {
673 let scoped = RouteEntry::new(
675 TopicMatcher::new("t.*"),
676 "capsule-a".into(),
677 Some(Some("alice".into())),
678 );
679 assert!(scoped.accepts(&Some("alice".into())));
680 assert!(!scoped.accepts(&Some("bob".into())));
681 assert!(!scoped.accepts(&None));
683
684 let unscoped = RouteEntry::new(TopicMatcher::new("t.*"), "capsule-a".into(), None);
686 assert!(unscoped.accepts(&Some("alice".into())));
687 assert!(unscoped.accepts(&Some("bob".into())));
688 assert!(unscoped.accepts(&None));
689 }
690
691 #[test]
692 fn scoped_drops_foreign_at_enqueue() {
693 let mut entry = RouteEntry::new(
697 TopicMatcher::new("t.*"),
698 "capsule-a".into(),
699 Some(Some("alice".into())),
700 );
701 for _ in 0..3 {
702 entry.push_with_eviction(
703 ipc("t.x", Some("alice")),
704 Some("alice".into()),
705 MAX_SUBSCRIPTION_BUDGET_BYTES,
706 );
707 }
708 for _ in 0..5 {
709 let evicted = entry.push_with_eviction(
710 ipc("t.x", Some("bob")),
711 Some("bob".into()),
712 MAX_SUBSCRIPTION_BUDGET_BYTES,
713 );
714 assert_eq!(evicted, 0, "foreign push is a no-op, never evicts");
715 }
716 assert_eq!(entry.fanout.len(), 1);
718 assert!(entry.fanout.contains_key(&Some("alice".into())));
719 assert!(!entry.fanout.contains_key(&Some("bob".into())));
720
721 let mut out = Vec::new();
722 entry.drr_drain(&mut out, MAX_SUBSCRIPTION_BUDGET_BYTES);
723 assert_eq!(out.len(), 3, "only alice's three events drain");
724 for ev in &out {
725 if let AstridEvent::Ipc { message, .. } = &**ev {
726 assert_eq!(message.principal.as_deref(), Some("alice"));
727 }
728 }
729 }
730
731 #[test]
732 fn scoped_budget_not_evictable_by_foreign_burst() {
733 let payload_size = 64 * 1024;
739 let budget = payload_size * 3 + 4096;
740 let mut entry = RouteEntry::new(
741 TopicMatcher::new("t.*"),
742 "capsule-a".into(),
743 Some(Some("alice".into())),
744 );
745 entry.push_with_eviction(
747 ipc_sized("t.alice.keep", Some("alice"), payload_size),
748 Some("alice".into()),
749 budget,
750 );
751 for _ in 0..100 {
753 entry.push_with_eviction(
754 ipc_sized("t.bob.flood", Some("bob"), payload_size),
755 Some("bob".into()),
756 budget,
757 );
758 }
759 let alice_q = entry
761 .fanout
762 .get(&Some("alice".into()))
763 .expect("alice queue survives");
764 assert_eq!(alice_q.queue.len(), 1, "alice's entry never evicted");
765 assert!(!entry.fanout.contains_key(&Some("bob".into())));
766 assert_eq!(entry.total_bytes, alice_q.bytes);
767 }
768
769 #[test]
770 fn alloc_increments_monotonically() {
771 let a = SubscriptionRepAllocator::default();
772 let n1 = a.next();
773 let n2 = a.next();
774 assert_eq!(n2, n1.saturating_add(1));
775 }
776}