use std::collections::{BTreeMap, HashMap};
use std::num::NonZeroUsize;
use std::sync::Arc;
use aion_core::{Event, WorkflowId};
use crate::error::ServerError;
use crate::namespace::NamespaceResolver;
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum GateVerdict {
Permitted {
workflow_type: Option<Arc<str>>,
},
Filtered,
}
pub struct NamespaceEventGate {
resolver: NamespaceResolver,
namespace: String,
verdicts: VerdictCache,
}
impl NamespaceEventGate {
#[must_use]
pub fn new(
resolver: NamespaceResolver,
namespace: String,
verdict_capacity: NonZeroUsize,
) -> Self {
Self {
resolver,
namespace,
verdicts: VerdictCache::new(verdict_capacity),
}
}
pub fn allow(&mut self, workflow_id: WorkflowId) {
self.verdicts.insert(
workflow_id,
CachedVerdict {
permitted: true,
workflow_type: None,
},
);
}
pub async fn admit(&mut self, event: &Event) -> Result<GateVerdict, ServerError> {
let workflow_id = event.workflow_id();
let cached = if let Some(verdict) = self.verdicts.get(workflow_id) {
verdict
} else {
let verdict = match self
.resolver
.workflow_attribution(&self.namespace, workflow_id)
.await?
{
Some(attribution) => CachedVerdict {
permitted: true,
workflow_type: attribution.workflow_type.map(Arc::from),
},
None => CachedVerdict {
permitted: false,
workflow_type: None,
},
};
self.verdicts.insert(workflow_id.clone(), verdict.clone());
verdict
};
if !cached.permitted {
return Ok(GateVerdict::Filtered);
}
let workflow_type = if let Event::WorkflowStarted { workflow_type, .. } = event {
let inline: Arc<str> = Arc::from(workflow_type.as_str());
self.verdicts.refresh_type(workflow_id, Arc::clone(&inline));
Some(inline)
} else {
cached.workflow_type
};
Ok(GateVerdict::Permitted { workflow_type })
}
}
#[derive(Clone, Debug)]
struct CachedVerdict {
permitted: bool,
workflow_type: Option<Arc<str>>,
}
struct VerdictCache {
capacity: NonZeroUsize,
entries: HashMap<WorkflowId, StampedVerdict>,
order: BTreeMap<u64, WorkflowId>,
clock: u64,
}
struct StampedVerdict {
stamp: u64,
verdict: CachedVerdict,
}
impl VerdictCache {
fn new(capacity: NonZeroUsize) -> Self {
Self {
capacity,
entries: HashMap::new(),
order: BTreeMap::new(),
clock: 0,
}
}
fn next_stamp(&mut self) -> u64 {
self.clock += 1;
self.clock
}
fn get(&mut self, workflow_id: &WorkflowId) -> Option<CachedVerdict> {
let stamp = self.next_stamp();
let entry = self.entries.get_mut(workflow_id)?;
self.order.remove(&entry.stamp);
entry.stamp = stamp;
self.order.insert(stamp, workflow_id.clone());
Some(entry.verdict.clone())
}
fn insert(&mut self, workflow_id: WorkflowId, verdict: CachedVerdict) {
let stamp = self.next_stamp();
if let Some(existing) = self.entries.get_mut(&workflow_id) {
self.order.remove(&existing.stamp);
existing.stamp = stamp;
existing.verdict = verdict;
self.order.insert(stamp, workflow_id);
return;
}
if self.entries.len() >= self.capacity.get() {
if let Some((&oldest_stamp, _)) = self.order.first_key_value() {
if let Some(evicted) = self.order.remove(&oldest_stamp) {
self.entries.remove(&evicted);
}
}
}
self.entries
.insert(workflow_id.clone(), StampedVerdict { stamp, verdict });
self.order.insert(stamp, workflow_id);
}
fn refresh_type(&mut self, workflow_id: &WorkflowId, workflow_type: Arc<str>) {
if let Some(entry) = self.entries.get_mut(workflow_id) {
entry.verdict.workflow_type = Some(workflow_type);
}
}
#[cfg(test)]
fn len(&self) -> usize {
debug_assert_eq!(self.entries.len(), self.order.len());
self.entries.len()
}
}
#[cfg(test)]
mod tests {
use std::num::NonZeroUsize;
use aion_core::{Event, EventEnvelope, Payload, WorkflowId};
use async_trait::async_trait;
use super::{GateVerdict, NamespaceEventGate};
use crate::config::NamespaceMode;
use crate::error::ServerError;
use crate::namespace::{
NamespaceResolver, StaticScheduleNamespaces, StaticWorkflowNamespaces, WorkflowAttribution,
WorkflowNamespaceSource,
};
fn capacity(value: usize) -> Result<NonZeroUsize, Box<dyn std::error::Error>> {
NonZeroUsize::new(value).ok_or_else(|| "capacity must be non-zero".into())
}
fn event(seq: u64, workflow_id: &WorkflowId) -> Result<Event, aion_core::PayloadError> {
Ok(Event::SignalReceived {
envelope: EventEnvelope {
seq,
recorded_at: chrono::Utc::now(),
workflow_id: workflow_id.clone(),
},
name: "ship".to_owned(),
payload: Payload::from_json(&serde_json::json!({ "seq": seq }))?,
})
}
fn started(
seq: u64,
workflow_id: &WorkflowId,
workflow_type: &str,
) -> Result<Event, aion_core::PayloadError> {
Ok(Event::WorkflowStarted {
envelope: EventEnvelope {
seq,
recorded_at: chrono::Utc::now(),
workflow_id: workflow_id.clone(),
},
workflow_type: workflow_type.to_owned(),
input: Payload::from_json(&serde_json::json!({ "seq": seq }))?,
run_id: aion_core::RunId::new(uuid::Uuid::from_u128(u128::from(seq))),
parent_run_id: None,
})
}
fn resolver(ownership: StaticWorkflowNamespaces) -> NamespaceResolver {
NamespaceResolver::authorization_only(
NamespaceMode::SharedEngine,
ownership,
StaticScheduleNamespaces::default(),
)
}
fn permitted(verdict: &GateVerdict) -> bool {
matches!(verdict, GateVerdict::Permitted { .. })
}
#[tokio::test]
async fn gate_permits_own_namespace_and_filters_foreign_and_unknown()
-> Result<(), Box<dyn std::error::Error>> {
let own = WorkflowId::new(uuid::Uuid::from_u128(1));
let foreign = WorkflowId::new(uuid::Uuid::from_u128(2));
let unknown = WorkflowId::new(uuid::Uuid::from_u128(3));
let ownership = StaticWorkflowNamespaces::default();
ownership.record(own.clone(), "tenant-a")?;
ownership.record(foreign.clone(), "tenant-b")?;
let mut gate =
NamespaceEventGate::new(resolver(ownership), "tenant-a".to_owned(), capacity(8)?);
assert!(permitted(&gate.admit(&event(1, &own)?).await?));
assert_eq!(
gate.admit(&event(1, &foreign)?).await?,
GateVerdict::Filtered
);
assert_eq!(
gate.admit(&event(1, &unknown)?).await?,
GateVerdict::Filtered
);
Ok(())
}
#[tokio::test]
async fn admit_carries_the_recorded_workflow_type() -> Result<(), Box<dyn std::error::Error>> {
let own = WorkflowId::new(uuid::Uuid::from_u128(1));
let ownership = StaticWorkflowNamespaces::default();
ownership.record_with_type(own.clone(), "tenant-a", "checkout")?;
let mut gate =
NamespaceEventGate::new(resolver(ownership), "tenant-a".to_owned(), capacity(8)?);
let verdict = gate.admit(&event(5, &own)?).await?;
let GateVerdict::Permitted { workflow_type } = verdict else {
return Err("owned workflow must be permitted".into());
};
assert_eq!(workflow_type.as_deref(), Some("checkout"));
Ok(())
}
#[tokio::test]
async fn workflow_started_refreshes_the_cached_type_inline()
-> Result<(), Box<dyn std::error::Error>> {
let own = WorkflowId::new(uuid::Uuid::from_u128(1));
let ownership = StaticWorkflowNamespaces::default();
ownership.record_with_type(own.clone(), "tenant-a", "checkout")?;
let mut gate =
NamespaceEventGate::new(resolver(ownership), "tenant-a".to_owned(), capacity(8)?);
let first = gate.admit(&event(1, &own)?).await?;
let GateVerdict::Permitted { workflow_type } = first else {
return Err("owned workflow must be permitted".into());
};
assert_eq!(workflow_type.as_deref(), Some("checkout"));
let started_verdict = gate.admit(&started(2, &own, "checkout-v2")?).await?;
let GateVerdict::Permitted { workflow_type } = started_verdict else {
return Err("owned workflow must be permitted".into());
};
assert_eq!(workflow_type.as_deref(), Some("checkout-v2"));
let after = gate.admit(&event(3, &own)?).await?;
let GateVerdict::Permitted { workflow_type } = after else {
return Err("owned workflow must be permitted".into());
};
assert_eq!(workflow_type.as_deref(), Some("checkout-v2"));
Ok(())
}
struct CountingOwnership {
inner: StaticWorkflowNamespaces,
reads: std::sync::Arc<std::sync::atomic::AtomicUsize>,
fail_after: usize,
}
#[async_trait]
impl WorkflowNamespaceSource for CountingOwnership {
async fn workflow_attribution(
&self,
workflow_id: &WorkflowId,
) -> Result<Option<WorkflowAttribution>, ServerError> {
let reads = self.reads.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if reads >= self.fail_after {
return Err(ServerError::Config {
message: "ownership source unavailable".to_owned(),
});
}
self.inner.workflow_attribution(workflow_id).await
}
}
fn counting_resolver(
inner: StaticWorkflowNamespaces,
fail_after: usize,
) -> (
NamespaceResolver,
std::sync::Arc<std::sync::atomic::AtomicUsize>,
) {
let reads = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let counting = CountingOwnership {
inner,
reads: std::sync::Arc::clone(&reads),
fail_after,
};
(
NamespaceResolver::authorization_only(
NamespaceMode::SharedEngine,
counting,
StaticScheduleNamespaces::default(),
),
reads,
)
}
#[tokio::test]
async fn verdicts_are_cached_per_workflow() -> Result<(), Box<dyn std::error::Error>> {
let own = WorkflowId::new(uuid::Uuid::from_u128(1));
let ownership = StaticWorkflowNamespaces::default();
ownership.record(own.clone(), "tenant-a")?;
let (resolver, _reads) = counting_resolver(ownership, 1);
let mut gate = NamespaceEventGate::new(resolver, "tenant-a".to_owned(), capacity(8)?);
assert!(permitted(&gate.admit(&event(1, &own)?).await?));
assert!(permitted(&gate.admit(&event(2, &own)?).await?));
Ok(())
}
#[tokio::test]
async fn verdict_cache_is_bounded_and_eviction_rereads_consistently()
-> Result<(), Box<dyn std::error::Error>> {
let first = WorkflowId::new(uuid::Uuid::from_u128(1));
let second = WorkflowId::new(uuid::Uuid::from_u128(2));
let third = WorkflowId::new(uuid::Uuid::from_u128(3));
let ownership = StaticWorkflowNamespaces::default();
ownership.record(first.clone(), "tenant-a")?;
ownership.record(second.clone(), "tenant-b")?;
ownership.record(third.clone(), "tenant-a")?;
let (resolver, reads) = counting_resolver(ownership, usize::MAX);
let mut gate = NamespaceEventGate::new(resolver, "tenant-a".to_owned(), capacity(2)?);
assert!(permitted(&gate.admit(&event(1, &first)?).await?));
assert_eq!(
gate.admit(&event(1, &second)?).await?,
GateVerdict::Filtered
);
assert!(permitted(&gate.admit(&event(1, &third)?).await?));
assert_eq!(gate.verdicts.len(), 2, "cache must never exceed its bound");
assert_eq!(reads.load(std::sync::atomic::Ordering::SeqCst), 3);
assert!(permitted(&gate.admit(&event(2, &first)?).await?));
assert_eq!(gate.verdicts.len(), 2, "cache must never exceed its bound");
assert_eq!(reads.load(std::sync::atomic::Ordering::SeqCst), 4);
Ok(())
}
#[tokio::test]
async fn lru_eviction_respects_recency() -> Result<(), Box<dyn std::error::Error>> {
let first = WorkflowId::new(uuid::Uuid::from_u128(1));
let second = WorkflowId::new(uuid::Uuid::from_u128(2));
let third = WorkflowId::new(uuid::Uuid::from_u128(3));
let ownership = StaticWorkflowNamespaces::default();
ownership.record(first.clone(), "tenant-a")?;
ownership.record(second.clone(), "tenant-a")?;
ownership.record(third.clone(), "tenant-a")?;
let (resolver, reads) = counting_resolver(ownership, usize::MAX);
let mut gate = NamespaceEventGate::new(resolver, "tenant-a".to_owned(), capacity(2)?);
assert!(permitted(&gate.admit(&event(1, &first)?).await?));
assert!(permitted(&gate.admit(&event(1, &second)?).await?));
assert!(permitted(&gate.admit(&event(2, &first)?).await?));
assert!(permitted(&gate.admit(&event(1, &third)?).await?));
assert_eq!(reads.load(std::sync::atomic::Ordering::SeqCst), 3);
assert!(permitted(&gate.admit(&event(3, &first)?).await?));
assert_eq!(reads.load(std::sync::atomic::Ordering::SeqCst), 3);
assert!(permitted(&gate.admit(&event(2, &second)?).await?));
assert_eq!(reads.load(std::sync::atomic::Ordering::SeqCst), 4);
Ok(())
}
#[tokio::test]
async fn pre_seeded_target_never_consults_the_ownership_source()
-> Result<(), Box<dyn std::error::Error>> {
let own = WorkflowId::new(uuid::Uuid::from_u128(1));
let (resolver, _reads) = counting_resolver(StaticWorkflowNamespaces::default(), 0);
let mut gate = NamespaceEventGate::new(resolver, "tenant-a".to_owned(), capacity(8)?);
gate.allow(own.clone());
assert!(permitted(&gate.admit(&event(1, &own)?).await?));
Ok(())
}
#[tokio::test]
async fn ownership_read_failure_propagates_instead_of_guessing()
-> Result<(), Box<dyn std::error::Error>> {
let own = WorkflowId::new(uuid::Uuid::from_u128(1));
let (resolver, _reads) = counting_resolver(StaticWorkflowNamespaces::default(), 0);
let mut gate = NamespaceEventGate::new(resolver, "tenant-a".to_owned(), capacity(8)?);
let error = gate.admit(&event(1, &own)?).await.err();
assert!(matches!(error, Some(ServerError::Config { .. })));
Ok(())
}
}