use std::collections::{BTreeSet, HashMap};
use std::sync::{Arc, RwLock};
use aion::Engine;
use aion_core::{ScheduleId, SearchAttributeValue, WorkflowId, search_attributes_from_events};
use aion_proto::WireError;
use async_trait::async_trait;
use crate::config::{NamespaceConfig, NamespaceMode};
use crate::error::ServerError;
use super::schedule_source::{HistoryScheduleNamespaceSource, ScheduleNamespaceSource};
pub const NAMESPACE_ATTRIBUTE: &str = "aion.namespace";
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum GrantSource {
NamespacesHeader,
TokenClaim,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct CallerIdentity {
subject: String,
namespaces: BTreeSet<String>,
denial_reason: Option<String>,
grant_source: GrantSource,
}
impl CallerIdentity {
#[must_use]
pub fn new(subject: impl Into<String>, namespaces: impl IntoIterator<Item = String>) -> Self {
Self {
subject: subject.into(),
namespaces: namespaces.into_iter().collect(),
denial_reason: None,
grant_source: GrantSource::NamespacesHeader,
}
}
#[must_use]
pub fn from_token_claims(
subject: impl Into<String>,
namespaces: impl IntoIterator<Item = String>,
) -> Self {
Self {
subject: subject.into(),
namespaces: namespaces.into_iter().collect(),
denial_reason: None,
grant_source: GrantSource::TokenClaim,
}
}
#[must_use]
pub fn denied(subject: impl Into<String>, reason: impl Into<String>) -> Self {
Self {
subject: subject.into(),
namespaces: BTreeSet::new(),
denial_reason: Some(reason.into()),
grant_source: GrantSource::NamespacesHeader,
}
}
#[must_use]
pub fn subject(&self) -> &str {
&self.subject
}
fn can_access(&self, namespace: &str) -> bool {
self.namespaces.contains(namespace)
}
fn denial_reason(&self) -> Option<&str> {
self.denial_reason.as_deref()
}
}
#[derive(Clone)]
pub struct ScopedEngine {
namespace: String,
engine: Option<Arc<Engine>>,
}
impl ScopedEngine {
#[must_use]
pub fn namespace(&self) -> &str {
&self.namespace
}
pub fn engine(&self) -> Result<&Arc<Engine>, ServerError> {
self.engine.as_ref().ok_or_else(|| ServerError::Config {
message: "namespace resolver has no engine handle".to_owned(),
})
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct WorkflowAttribution {
pub namespace: String,
pub workflow_type: Option<String>,
}
#[async_trait]
pub trait WorkflowNamespaceSource: Send + Sync {
async fn workflow_attribution(
&self,
workflow_id: &WorkflowId,
) -> Result<Option<WorkflowAttribution>, ServerError>;
}
struct HistoryNamespaceSource {
engine: Arc<Engine>,
}
#[async_trait]
impl WorkflowNamespaceSource for HistoryNamespaceSource {
async fn workflow_attribution(
&self,
workflow_id: &WorkflowId,
) -> Result<Option<WorkflowAttribution>, ServerError> {
let history = self
.engine
.store()
.read_history(workflow_id)
.await
.map_err(ServerError::from)?;
let namespace = match search_attributes_from_events(&history).remove(NAMESPACE_ATTRIBUTE) {
Some(SearchAttributeValue::String(namespace)) => namespace,
Some(other) => {
return Err(ServerError::Config {
message: format!(
"workflow {workflow_id} recorded a non-string {NAMESPACE_ATTRIBUTE} search attribute: {other:?}"
),
});
}
None => return Ok(None),
};
let workflow_type = history.iter().rev().find_map(|event| match event {
aion_core::Event::WorkflowStarted { workflow_type, .. } => Some(workflow_type.clone()),
_ => None,
});
Ok(Some(WorkflowAttribution {
namespace,
workflow_type,
}))
}
}
#[derive(Clone, Default)]
pub struct StaticWorkflowNamespaces {
inner: Arc<RwLock<HashMap<WorkflowId, WorkflowAttribution>>>,
}
impl StaticWorkflowNamespaces {
pub fn record(&self, workflow_id: WorkflowId, namespace: &str) -> Result<(), ServerError> {
self.insert(
workflow_id,
WorkflowAttribution {
namespace: namespace.to_owned(),
workflow_type: None,
},
)
}
pub fn record_with_type(
&self,
workflow_id: WorkflowId,
namespace: &str,
workflow_type: &str,
) -> Result<(), ServerError> {
self.insert(
workflow_id,
WorkflowAttribution {
namespace: namespace.to_owned(),
workflow_type: Some(workflow_type.to_owned()),
},
)
}
fn insert(
&self,
workflow_id: WorkflowId,
attribution: WorkflowAttribution,
) -> Result<(), ServerError> {
let mut ownership = self
.inner
.write()
.map_err(|_| ServerError::lock_poisoned("namespace workflow ownership"))?;
ownership.insert(workflow_id, attribution);
Ok(())
}
}
#[async_trait]
impl WorkflowNamespaceSource for StaticWorkflowNamespaces {
async fn workflow_attribution(
&self,
workflow_id: &WorkflowId,
) -> Result<Option<WorkflowAttribution>, ServerError> {
let ownership = self
.inner
.read()
.map_err(|_| ServerError::lock_poisoned("namespace workflow ownership"))?;
Ok(ownership.get(workflow_id).cloned())
}
}
#[derive(Clone)]
pub struct NamespaceResolver {
mode: NamespaceMode,
engine: Option<Arc<Engine>>,
ownership: Arc<dyn WorkflowNamespaceSource>,
schedule_ownership: Arc<dyn ScheduleNamespaceSource>,
}
impl NamespaceResolver {
#[must_use]
pub fn from_config(config: NamespaceConfig, engine: Arc<Engine>) -> Self {
Self {
mode: config.mode,
ownership: Arc::new(HistoryNamespaceSource {
engine: Arc::clone(&engine),
}),
schedule_ownership: Arc::new(HistoryScheduleNamespaceSource::new(Arc::clone(&engine))),
engine: Some(engine),
}
}
#[must_use]
pub fn from_parts(
mode: NamespaceMode,
engine: Option<Arc<Engine>>,
ownership: Arc<dyn WorkflowNamespaceSource>,
schedule_ownership: Arc<dyn ScheduleNamespaceSource>,
) -> Self {
Self {
mode,
engine,
ownership,
schedule_ownership,
}
}
#[must_use]
pub fn authorization_only(
mode: NamespaceMode,
ownership: impl WorkflowNamespaceSource + 'static,
schedule_ownership: impl ScheduleNamespaceSource + 'static,
) -> Self {
Self::from_parts(
mode,
None,
Arc::new(ownership),
Arc::new(schedule_ownership),
)
}
#[must_use]
pub const fn mode(&self) -> &NamespaceMode {
&self.mode
}
pub fn shutdown_engine(&self) -> Result<(), ServerError> {
self.engine
.as_ref()
.ok_or_else(|| ServerError::Config {
message: "namespace resolver has no engine handle".to_owned(),
})?
.shutdown()
.map_err(ServerError::from)
}
pub(super) fn resolve(
&self,
caller: &CallerIdentity,
requested_namespace: &str,
) -> Result<ScopedEngine, ServerError> {
if requested_namespace.is_empty() {
return Err(ServerError::namespace_denied(
"requested namespace must not be empty",
));
}
if let Some(reason) = caller.denial_reason() {
return Err(ServerError::namespace_denied(reason));
}
match &self.mode {
NamespaceMode::SingleTenant { namespace } if namespace == requested_namespace => {
Ok(self.scoped(requested_namespace))
}
NamespaceMode::SharedEngine if caller.can_access(requested_namespace) => {
Ok(self.scoped(requested_namespace))
}
NamespaceMode::SingleTenant { .. } | NamespaceMode::SharedEngine => {
Err(namespace_denied(caller, requested_namespace))
}
}
}
pub async fn verify_workflow_ownership(
&self,
namespace: &str,
workflow_id: &WorkflowId,
) -> Result<(), ServerError> {
match self.workflow_attribution(namespace, workflow_id).await? {
Some(_) => Ok(()),
None => Err(ServerError::Wire {
wire: WireError::not_found(format!("workflow not found in namespace {namespace}")),
}),
}
}
pub async fn workflow_attribution(
&self,
namespace: &str,
workflow_id: &WorkflowId,
) -> Result<Option<WorkflowAttribution>, ServerError> {
Ok(self
.ownership
.workflow_attribution(workflow_id)
.await?
.filter(|attribution| attribution.namespace == namespace))
}
pub async fn verify_schedule_ownership(
&self,
namespace: &str,
schedule_id: &ScheduleId,
) -> Result<(), ServerError> {
match self
.schedule_ownership
.schedule_namespace(schedule_id)
.await?
{
Some(owner) if owner == namespace => Ok(()),
Some(_) | None => Err(ServerError::Wire {
wire: WireError::not_found(format!("schedule not found in namespace {namespace}")),
}),
}
}
fn scoped(&self, namespace: &str) -> ScopedEngine {
ScopedEngine {
namespace: namespace.to_owned(),
engine: self.engine.clone(),
}
}
}
fn namespace_denied(caller: &CallerIdentity, requested_namespace: &str) -> ServerError {
let hint = match caller.grant_source {
GrantSource::NamespacesHeader => format!(
"add {requested_namespace} to x-aion-namespaces for subject `{}` or request a namespace listed in that header",
caller.subject()
),
GrantSource::TokenClaim => format!(
"grant {requested_namespace} in the namespace claim of the token minted for subject `{}` or request a namespace the token grants",
caller.subject()
),
};
ServerError::namespace_denied(format!(
"subject not authorized for namespace {requested_namespace}; {hint}"
))
}
#[cfg(test)]
mod tests {
use super::{
CallerIdentity, NamespaceResolver, StaticWorkflowNamespaces, WorkflowNamespaceSource,
};
use crate::config::NamespaceMode;
use crate::namespace::StaticScheduleNamespaces;
use aion_core::{ScheduleId, WorkflowId};
fn resolver(mode: NamespaceMode) -> NamespaceResolver {
NamespaceResolver::authorization_only(
mode,
StaticWorkflowNamespaces::default(),
StaticScheduleNamespaces::default(),
)
}
#[test]
fn shared_engine_authorizes_explicit_caller_grant() -> Result<(), Box<dyn std::error::Error>> {
let resolver = resolver(NamespaceMode::SharedEngine);
let caller = CallerIdentity::new("alice", [String::from("tenant-a")]);
let scoped = resolver.resolve(&caller, "tenant-a")?;
assert_eq!(scoped.namespace(), "tenant-a");
Ok(())
}
#[test]
fn shared_engine_denies_missing_caller_grant() {
let resolver = resolver(NamespaceMode::SharedEngine);
let caller = CallerIdentity::new("alice", [String::from("tenant-a")]);
let denied = resolver.resolve(&caller, "tenant-b");
assert!(denied.is_err());
}
#[test]
fn single_tenant_authorizes_only_configured_namespace() -> Result<(), Box<dyn std::error::Error>>
{
let resolver = resolver(NamespaceMode::SingleTenant {
namespace: String::from("tenant-a"),
});
let caller = CallerIdentity::new("alice", [String::from("tenant-b")]);
let scoped = resolver.resolve(&caller, "tenant-a")?;
let denied = resolver.resolve(&caller, "tenant-b");
assert_eq!(scoped.namespace(), "tenant-a");
assert!(denied.is_err());
Ok(())
}
#[test]
fn denial_hint_names_the_grant_source() -> Result<(), Box<dyn std::error::Error>> {
let resolver = resolver(NamespaceMode::SharedEngine);
let header_caller = CallerIdentity::new("alice", [String::from("tenant-a")]);
let header_denial = resolver
.resolve(&header_caller, "tenant-b")
.err()
.map(|error| error.to_wire_error())
.ok_or("expected header-sourced caller to be denied")?;
assert!(
header_denial.message.contains("x-aion-namespaces"),
"header-path denial must hint the dev header: {}",
header_denial.message
);
assert!(
!header_denial.message.contains("namespace claim"),
"header-path denial must not hint the token claim: {}",
header_denial.message
);
let token_caller = CallerIdentity::from_token_claims("alice", [String::from("tenant-a")]);
let token_denial = resolver
.resolve(&token_caller, "tenant-b")
.err()
.map(|error| error.to_wire_error())
.ok_or("expected token-sourced caller to be denied")?;
assert!(
token_denial.message.contains("namespace claim"),
"JWT-path denial must hint the token's namespace claim: {}",
token_denial.message
);
assert!(
!token_denial.message.contains("x-aion-namespaces"),
"JWT-path denial must not hint the dev header: {}",
token_denial.message
);
Ok(())
}
#[test]
fn empty_namespace_is_denied_before_scoping() {
let resolver = resolver(NamespaceMode::SharedEngine);
let caller = CallerIdentity::new("alice", [String::new()]);
let denied = resolver.resolve(&caller, "");
assert!(denied.is_err());
}
#[tokio::test]
async fn ownership_misses_are_indistinguishable_not_found()
-> Result<(), Box<dyn std::error::Error>> {
let ownership = StaticWorkflowNamespaces::default();
let owned = WorkflowId::new(uuid::Uuid::from_u128(1));
let unknown = WorkflowId::new(uuid::Uuid::from_u128(2));
ownership.record(owned.clone(), "tenant-a")?;
let resolver = NamespaceResolver::authorization_only(
NamespaceMode::SharedEngine,
ownership,
StaticScheduleNamespaces::default(),
);
resolver
.verify_workflow_ownership("tenant-a", &owned)
.await?;
let foreign = resolver
.verify_workflow_ownership("tenant-b", &owned)
.await
.err()
.map(|error| error.to_wire_error())
.ok_or("expected foreign-owned workflow to be rejected")?;
let absent = resolver
.verify_workflow_ownership("tenant-b", &unknown)
.await
.err()
.map(|error| error.to_wire_error())
.ok_or("expected unknown workflow to be rejected")?;
assert_eq!(foreign.code, aion_proto::WireErrorCode::NotFound);
assert_eq!(foreign, absent);
assert_eq!(foreign.message, "workflow not found in namespace tenant-b");
let absent_in_granted = resolver
.verify_workflow_ownership("tenant-a", &unknown)
.await
.err()
.map(|error| error.to_wire_error())
.ok_or("expected unknown workflow to be rejected in granted namespace")?;
assert_eq!(absent_in_granted.code, aion_proto::WireErrorCode::NotFound);
assert_eq!(
absent_in_granted.message,
"workflow not found in namespace tenant-a"
);
Ok(())
}
#[tokio::test]
async fn schedule_ownership_misses_are_indistinguishable_not_found()
-> Result<(), Box<dyn std::error::Error>> {
let schedule_ownership = StaticScheduleNamespaces::default();
let owned = ScheduleId::new(uuid::Uuid::from_u128(1));
let unknown = ScheduleId::new(uuid::Uuid::from_u128(2));
schedule_ownership.record(owned.clone(), "tenant-a")?;
let resolver = NamespaceResolver::authorization_only(
NamespaceMode::SharedEngine,
StaticWorkflowNamespaces::default(),
schedule_ownership,
);
resolver
.verify_schedule_ownership("tenant-a", &owned)
.await?;
let foreign = resolver
.verify_schedule_ownership("tenant-b", &owned)
.await
.err()
.map(|error| error.to_wire_error())
.ok_or("expected foreign-owned schedule to be rejected")?;
let absent = resolver
.verify_schedule_ownership("tenant-b", &unknown)
.await
.err()
.map(|error| error.to_wire_error())
.ok_or("expected unknown schedule to be rejected")?;
assert_eq!(foreign.code, aion_proto::WireErrorCode::NotFound);
assert_eq!(foreign, absent);
assert_eq!(foreign.message, "schedule not found in namespace tenant-b");
let absent_in_granted = resolver
.verify_schedule_ownership("tenant-a", &unknown)
.await
.err()
.map(|error| error.to_wire_error())
.ok_or("expected unknown schedule to be rejected in granted namespace")?;
assert_eq!(absent_in_granted.code, aion_proto::WireErrorCode::NotFound);
assert_eq!(
absent_in_granted.message,
"schedule not found in namespace tenant-a"
);
Ok(())
}
#[tokio::test]
async fn static_source_reports_recorded_namespace() -> Result<(), Box<dyn std::error::Error>> {
let ownership = StaticWorkflowNamespaces::default();
let workflow_id = WorkflowId::new(uuid::Uuid::from_u128(3));
ownership.record(workflow_id.clone(), "tenant-a")?;
assert_eq!(
ownership.workflow_attribution(&workflow_id).await?,
Some(super::WorkflowAttribution {
namespace: String::from("tenant-a"),
workflow_type: None,
})
);
Ok(())
}
#[tokio::test]
async fn static_source_reports_recorded_workflow_type() -> Result<(), Box<dyn std::error::Error>>
{
let ownership = StaticWorkflowNamespaces::default();
let workflow_id = WorkflowId::new(uuid::Uuid::from_u128(4));
ownership.record_with_type(workflow_id.clone(), "tenant-a", "checkout")?;
assert_eq!(
ownership.workflow_attribution(&workflow_id).await?,
Some(super::WorkflowAttribution {
namespace: String::from("tenant-a"),
workflow_type: Some(String::from("checkout")),
})
);
Ok(())
}
#[tokio::test]
async fn scoped_attribution_hides_foreign_and_unknown_identically()
-> Result<(), Box<dyn std::error::Error>> {
let ownership = StaticWorkflowNamespaces::default();
let owned = WorkflowId::new(uuid::Uuid::from_u128(5));
let foreign = WorkflowId::new(uuid::Uuid::from_u128(6));
let unknown = WorkflowId::new(uuid::Uuid::from_u128(7));
ownership.record_with_type(owned.clone(), "tenant-a", "checkout")?;
ownership.record_with_type(foreign.clone(), "tenant-b", "checkout")?;
let resolver = NamespaceResolver::authorization_only(
NamespaceMode::SharedEngine,
ownership,
StaticScheduleNamespaces::default(),
);
let visible = resolver
.workflow_attribution("tenant-a", &owned)
.await?
.ok_or("owned workflow attribution must be visible")?;
assert_eq!(visible.workflow_type.as_deref(), Some("checkout"));
assert_eq!(
resolver.workflow_attribution("tenant-a", &foreign).await?,
None
);
assert_eq!(
resolver.workflow_attribution("tenant-a", &unknown).await?,
None
);
Ok(())
}
}