use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
use tokio::sync::Notify;
use uuid::Uuid;
use crate::event::AstridEvent;
use crate::route::matcher::{TopicMatcher, ipc_size_of, principal_class_label};
pub const MAX_SUBSCRIPTION_BUDGET_BYTES: usize = 1024 * 1024;
pub const DRR_QUANTUM_MIN_BYTES: usize = 4 * 1024;
pub(crate) const PENDING_PER_PRINCIPAL_FALLBACK: usize = 256;
pub const METRIC_ROUTE_BYTE_EVICTIONS_TOTAL: &str = "astrid_capsule_route_byte_evictions_total";
pub const METRIC_ROUTE_QUANTUM_STARVED_TOTAL: &str = "astrid_capsule_route_quantum_starved_total";
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct RouteKey {
pub capsule_uuid: Uuid,
pub topic_pattern: String,
pub subscription_rep: u64,
}
pub type PrincipalKey = Option<String>;
#[derive(Debug)]
pub(crate) struct PrincipalQueue {
pub(crate) queue: VecDeque<Arc<AstridEvent>>,
pub(crate) bytes: usize,
pub(crate) head_enqueued_at: Option<Instant>,
pub(crate) deficit: usize,
}
impl PrincipalQueue {
fn new() -> Self {
Self {
queue: VecDeque::new(),
bytes: 0,
head_enqueued_at: None,
deficit: 0,
}
}
}
#[derive(Debug)]
pub(crate) struct RouteEntry {
pub(crate) matcher: TopicMatcher,
pub(crate) fanout: HashMap<PrincipalKey, PrincipalQueue>,
pub(crate) principal_order: VecDeque<PrincipalKey>,
pub(crate) total_bytes: usize,
pub(crate) capsule_id_label: String,
pub(crate) scope: Option<PrincipalKey>,
pub(crate) notify: Arc<Notify>,
}
impl RouteEntry {
pub(crate) fn new(
matcher: TopicMatcher,
capsule_id_label: String,
scope: Option<PrincipalKey>,
) -> Self {
Self {
matcher,
fanout: HashMap::new(),
principal_order: VecDeque::new(),
total_bytes: 0,
capsule_id_label,
scope,
notify: Arc::new(Notify::new()),
}
}
pub(crate) fn accepts(&self, publisher: &PrincipalKey) -> bool {
self.scope.as_ref().is_none_or(|s| s == publisher)
}
pub(crate) fn push_with_eviction(
&mut self,
event: Arc<AstridEvent>,
principal: PrincipalKey,
budget_bytes: usize,
) -> usize {
if !self.accepts(&principal) {
return 0;
}
let msg_size = ipc_size_of(&event);
if msg_size > budget_bytes {
let class = principal_class_label(principal.as_deref());
tracing::error!(
target: "astrid.audit.ipc",
security_event = true,
capsule = %self.capsule_id_label,
principal = principal.as_deref().unwrap_or("<none>"),
msg_size,
budget_bytes,
"ipc::route: incoming message exceeds global byte budget, rejecting publish",
);
metrics::counter!(
METRIC_ROUTE_BYTE_EVICTIONS_TOTAL,
"capsule" => self.capsule_id_label.clone(),
"principal_class" => class,
)
.increment(1);
return 1;
}
let mut evictions = 0usize;
while self.total_bytes.saturating_add(msg_size) > budget_bytes {
if !self.evict_oldest_head() {
break;
}
evictions = evictions.saturating_add(1);
}
let now = Instant::now();
let is_new = !self.fanout.contains_key(&principal);
let bucket = self
.fanout
.entry(principal.clone())
.or_insert_with(PrincipalQueue::new);
if bucket.queue.is_empty() {
bucket.head_enqueued_at = Some(now);
}
bucket.queue.push_back(event);
bucket.bytes = bucket.bytes.saturating_add(msg_size);
self.total_bytes = self.total_bytes.saturating_add(msg_size);
if bucket.queue.len() > PENDING_PER_PRINCIPAL_FALLBACK
&& let Some(dropped) = bucket.queue.pop_front()
{
let dropped_size = ipc_size_of(&dropped);
bucket.bytes = bucket.bytes.saturating_sub(dropped_size);
self.total_bytes = self.total_bytes.saturating_sub(dropped_size);
bucket.head_enqueued_at = if bucket.queue.is_empty() {
None
} else {
Some(Instant::now())
};
let class = principal_class_label(principal.as_deref());
tracing::error!(
target: "astrid.audit.ipc",
security_event = true,
capsule = %self.capsule_id_label,
principal = principal.as_deref().unwrap_or("<none>"),
cap = PENDING_PER_PRINCIPAL_FALLBACK,
"ipc::route: per-principal queue cap reached, dropping oldest",
);
metrics::counter!(
METRIC_ROUTE_BYTE_EVICTIONS_TOTAL,
"capsule" => self.capsule_id_label.clone(),
"principal_class" => class,
)
.increment(1);
}
if is_new {
self.principal_order.push_back(principal);
}
evictions
}
fn evict_oldest_head(&mut self) -> bool {
let Some(victim_key) = self.oldest_head_key() else {
return false;
};
let Some(bucket) = self.fanout.get_mut(&victim_key) else {
return false;
};
let Some(evicted) = bucket.queue.pop_front() else {
return false;
};
let evicted_size = ipc_size_of(&evicted);
bucket.bytes = bucket.bytes.saturating_sub(evicted_size);
self.total_bytes = self.total_bytes.saturating_sub(evicted_size);
bucket.head_enqueued_at = if bucket.queue.is_empty() {
None
} else {
Some(Instant::now())
};
let evicted_topic = match &*evicted {
AstridEvent::Ipc { message, .. } => message.topic.clone(),
other => other.event_type().to_string(),
};
let class = principal_class_label(victim_key.as_deref());
tracing::error!(
target: "astrid.audit.ipc",
security_event = true,
capsule = %self.capsule_id_label,
principal = victim_key.as_deref().unwrap_or("<none>"),
evicted_topic = %evicted_topic,
total_bytes = self.total_bytes,
"ipc::route: global byte budget exhausted, dropping head of oldest queue",
);
metrics::counter!(
METRIC_ROUTE_BYTE_EVICTIONS_TOTAL,
"capsule" => self.capsule_id_label.clone(),
"principal_class" => class,
)
.increment(1);
true
}
fn oldest_head_key(&self) -> Option<PrincipalKey> {
self.fanout
.iter()
.filter_map(|(k, q)| q.head_enqueued_at.map(|t| (t, k.clone())))
.min_by_key(|(t, _)| *t)
.map(|(_, k)| k)
}
pub(crate) fn drr_drain(&mut self, out: &mut Vec<Arc<AstridEvent>>, budget: usize) -> usize {
if self.fanout.is_empty() || budget == 0 {
return 0;
}
let mut served = 0usize;
let total = self.principal_order.len().max(1);
let quantum = std::cmp::max(
DRR_QUANTUM_MIN_BYTES,
budget.checked_div(total).unwrap_or(0),
);
loop {
let mut progress = false;
let visit = self.principal_order.len();
for _ in 0..visit {
let Some(key) = self.principal_order.pop_front() else {
break;
};
let Some(bucket) = self.fanout.get_mut(&key) else {
continue;
};
bucket.deficit = bucket.deficit.saturating_add(quantum);
let mut bucket_progress = false;
while let Some(front) = bucket.queue.front() {
let sz = ipc_size_of(front);
if sz > bucket.deficit || served.saturating_add(sz) > budget {
break;
}
let msg = bucket.queue.pop_front().expect("front checked above");
bucket.deficit = bucket.deficit.saturating_sub(sz);
bucket.bytes = bucket.bytes.saturating_sub(sz);
self.total_bytes = self.total_bytes.saturating_sub(sz);
served = served.saturating_add(sz);
out.push(msg);
bucket_progress = true;
bucket.head_enqueued_at = if bucket.queue.is_empty() {
None
} else {
Some(Instant::now())
};
}
progress |= bucket_progress;
if !bucket_progress && !bucket.queue.is_empty() {
metrics::counter!(
METRIC_ROUTE_QUANTUM_STARVED_TOTAL,
"capsule" => self.capsule_id_label.clone(),
"principal_class" => principal_class_label(key.as_deref()),
)
.increment(1);
}
if bucket.queue.is_empty() {
self.fanout.remove(&key);
} else {
self.principal_order.push_back(key);
}
}
if !progress || served >= budget {
break;
}
}
served
}
pub(crate) fn active_principals(&self) -> usize {
self.fanout.len()
}
}
#[derive(Debug, Default)]
pub(crate) struct SubscriptionRepAllocator(pub(crate) AtomicU64);
impl SubscriptionRepAllocator {
pub(crate) fn next(&self) -> u64 {
let v = self.0.fetch_add(1, Ordering::Relaxed);
v.saturating_add(1)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event::EventMetadata;
use crate::ipc::{IpcMessage, IpcPayload};
use serde_json::json;
use uuid::Uuid;
fn ipc(topic: &str, principal: Option<&str>) -> Arc<AstridEvent> {
let mut msg = IpcMessage::new(topic, IpcPayload::RawJson(json!({})), Uuid::nil());
msg.principal = principal.map(String::from);
Arc::new(AstridEvent::Ipc {
metadata: EventMetadata::new("test"),
message: msg,
})
}
fn ipc_sized(topic: &str, principal: Option<&str>, payload_bytes: usize) -> Arc<AstridEvent> {
let blob = "x".repeat(payload_bytes);
let mut msg = IpcMessage::new(
topic,
IpcPayload::RawJson(json!({ "p": blob })),
Uuid::nil(),
);
msg.principal = principal.map(String::from);
Arc::new(AstridEvent::Ipc {
metadata: EventMetadata::new("test"),
message: msg,
})
}
#[test]
fn push_and_drain_single_principal() {
let mut entry = RouteEntry::new(TopicMatcher::new("t.*"), "capsule-a".into(), None);
for _ in 0..3 {
entry.push_with_eviction(
ipc("t.x", Some("alice")),
Some("alice".into()),
MAX_SUBSCRIPTION_BUDGET_BYTES,
);
}
let mut out = Vec::new();
entry.drr_drain(&mut out, MAX_SUBSCRIPTION_BUDGET_BYTES);
assert_eq!(out.len(), 3);
assert_eq!(entry.fanout.len(), 0);
assert_eq!(entry.total_bytes, 0);
}
#[test]
fn drr_two_principals_yield_equal_counts() {
let mut entry = RouteEntry::new(TopicMatcher::new("t.*"), "capsule-a".into(), None);
for _ in 0..2 {
entry.push_with_eviction(
ipc("t.x", Some("alice")),
Some("alice".into()),
MAX_SUBSCRIPTION_BUDGET_BYTES,
);
entry.push_with_eviction(
ipc("t.x", Some("bob")),
Some("bob".into()),
MAX_SUBSCRIPTION_BUDGET_BYTES,
);
}
let mut out = Vec::new();
entry.drr_drain(&mut out, MAX_SUBSCRIPTION_BUDGET_BYTES);
assert_eq!(out.len(), 4);
let mut alice_count = 0;
let mut bob_count = 0;
for ev in &out {
if let AstridEvent::Ipc { message, .. } = &**ev {
match message.principal.as_deref() {
Some("alice") => alice_count += 1,
Some("bob") => bob_count += 1,
_ => {},
}
}
}
assert_eq!(alice_count, 2);
assert_eq!(bob_count, 2);
}
#[test]
fn drr_interleaves_when_quantum_caps_per_round() {
let payload_size = 8 * 1024; let budget = payload_size * 4 + 1024;
let mut entry = RouteEntry::new(TopicMatcher::new("t.*"), "capsule-a".into(), None);
entry.push_with_eviction(
ipc_sized("t.x", Some("alice"), payload_size),
Some("alice".into()),
budget,
);
entry.push_with_eviction(
ipc_sized("t.x", Some("bob"), payload_size),
Some("bob".into()),
budget,
);
entry.push_with_eviction(
ipc_sized("t.x", Some("alice"), payload_size),
Some("alice".into()),
budget,
);
entry.push_with_eviction(
ipc_sized("t.x", Some("bob"), payload_size),
Some("bob".into()),
budget,
);
let mut out = Vec::new();
entry.drr_drain(&mut out, budget);
let mut alice_count = 0;
let mut bob_count = 0;
for ev in &out {
if let AstridEvent::Ipc { message, .. } = &**ev {
match message.principal.as_deref() {
Some("alice") => alice_count += 1,
Some("bob") => bob_count += 1,
_ => {},
}
}
}
assert_eq!(alice_count, 2, "alice fairness");
assert_eq!(bob_count, 2, "bob fairness");
}
#[test]
fn drr_isolates_principals_under_burst() {
let mut entry = RouteEntry::new(TopicMatcher::new("t.*"), "capsule-a".into(), None);
for _ in 0..200 {
entry.push_with_eviction(
ipc("t.x", Some("alice")),
Some("alice".into()),
MAX_SUBSCRIPTION_BUDGET_BYTES,
);
}
assert_eq!(entry.fanout.len(), 1);
assert!(entry.fanout.contains_key(&Some("alice".into())));
}
#[test]
fn eviction_drops_oldest_head_under_budget() {
let payload_size = 64 * 1024;
let budget = payload_size * 3 + 4096;
let mut entry = RouteEntry::new(TopicMatcher::new("t.*"), "capsule-a".into(), None);
for _ in 0..3 {
entry.push_with_eviction(
ipc_sized("t.alice", Some("alice"), payload_size),
Some("alice".into()),
budget,
);
}
assert_eq!(
entry
.fanout
.get(&Some("alice".into()))
.map(|q| q.queue.len()),
Some(3)
);
entry.push_with_eviction(
ipc_sized("t.bob.terminator", Some("bob"), payload_size / 4),
Some("bob".into()),
budget,
);
entry.push_with_eviction(
ipc_sized("t.alice.new", Some("alice"), payload_size),
Some("alice".into()),
budget,
);
let alice_q = entry
.fanout
.get(&Some("alice".into()))
.expect("alice queue");
let bob_q = entry.fanout.get(&Some("bob".into())).expect("bob queue");
assert!(bob_q.queue.iter().any(|e| match &**e {
AstridEvent::Ipc { message, .. } => message.topic == "t.bob.terminator",
_ => false,
}));
assert!(
alice_q.queue.len() < 4,
"alice queue should have shed at least one head"
);
}
#[test]
fn pathological_message_alone_is_rejected() {
let small_budget = 1024;
let mut entry = RouteEntry::new(TopicMatcher::new("t.*"), "capsule-a".into(), None);
entry.push_with_eviction(
ipc_sized("t.alice", Some("alice"), 4096),
Some("alice".into()),
small_budget,
);
assert_eq!(entry.fanout.len(), 0);
assert_eq!(entry.total_bytes, 0);
}
#[test]
fn fairness_under_5000_principals_makes_progress() {
let mut entry = RouteEntry::new(TopicMatcher::new("t.*"), "capsule-a".into(), None);
for i in 0..5000 {
let p = format!("p{i}");
entry.push_with_eviction(ipc("t.x", Some(&p)), Some(p), MAX_SUBSCRIPTION_BUDGET_BYTES);
}
let mut out = Vec::new();
entry.drr_drain(&mut out, MAX_SUBSCRIPTION_BUDGET_BYTES);
assert_eq!(out.len(), 5000);
assert_eq!(entry.fanout.len(), 0);
}
#[test]
fn accepts_predicate_authz_rule() {
let scoped = RouteEntry::new(
TopicMatcher::new("t.*"),
"capsule-a".into(),
Some(Some("alice".into())),
);
assert!(scoped.accepts(&Some("alice".into())));
assert!(!scoped.accepts(&Some("bob".into())));
assert!(!scoped.accepts(&None));
let unscoped = RouteEntry::new(TopicMatcher::new("t.*"), "capsule-a".into(), None);
assert!(unscoped.accepts(&Some("alice".into())));
assert!(unscoped.accepts(&Some("bob".into())));
assert!(unscoped.accepts(&None));
}
#[test]
fn scoped_drops_foreign_at_enqueue() {
let mut entry = RouteEntry::new(
TopicMatcher::new("t.*"),
"capsule-a".into(),
Some(Some("alice".into())),
);
for _ in 0..3 {
entry.push_with_eviction(
ipc("t.x", Some("alice")),
Some("alice".into()),
MAX_SUBSCRIPTION_BUDGET_BYTES,
);
}
for _ in 0..5 {
let evicted = entry.push_with_eviction(
ipc("t.x", Some("bob")),
Some("bob".into()),
MAX_SUBSCRIPTION_BUDGET_BYTES,
);
assert_eq!(evicted, 0, "foreign push is a no-op, never evicts");
}
assert_eq!(entry.fanout.len(), 1);
assert!(entry.fanout.contains_key(&Some("alice".into())));
assert!(!entry.fanout.contains_key(&Some("bob".into())));
let mut out = Vec::new();
entry.drr_drain(&mut out, MAX_SUBSCRIPTION_BUDGET_BYTES);
assert_eq!(out.len(), 3, "only alice's three events drain");
for ev in &out {
if let AstridEvent::Ipc { message, .. } = &**ev {
assert_eq!(message.principal.as_deref(), Some("alice"));
}
}
}
#[test]
fn scoped_budget_not_evictable_by_foreign_burst() {
let payload_size = 64 * 1024;
let budget = payload_size * 3 + 4096;
let mut entry = RouteEntry::new(
TopicMatcher::new("t.*"),
"capsule-a".into(),
Some(Some("alice".into())),
);
entry.push_with_eviction(
ipc_sized("t.alice.keep", Some("alice"), payload_size),
Some("alice".into()),
budget,
);
for _ in 0..100 {
entry.push_with_eviction(
ipc_sized("t.bob.flood", Some("bob"), payload_size),
Some("bob".into()),
budget,
);
}
let alice_q = entry
.fanout
.get(&Some("alice".into()))
.expect("alice queue survives");
assert_eq!(alice_q.queue.len(), 1, "alice's entry never evicted");
assert!(!entry.fanout.contains_key(&Some("bob".into())));
assert_eq!(entry.total_bytes, alice_q.bytes);
}
#[test]
fn alloc_increments_monotonically() {
let a = SubscriptionRepAllocator::default();
let n1 = a.next();
let n2 = a.next();
assert_eq!(n2, n1.saturating_add(1));
}
}