use aion::EventFilter;
use aion_core::{RunId, ScheduleId, WorkflowFilter, WorkflowId};
use aion_proto::{
FilteredSubscription, FirehoseSubscription, PerWorkflowSubscription, ProtoCancelRequest,
ProtoCountWorkflowsRequest, ProtoCreateScheduleRequest, ProtoDescribeWorkflowRequest,
ProtoListSchedulesRequest, ProtoListWorkflowsRequest, ProtoQueryRequest, ProtoRegisterWorker,
ProtoScheduleIdRequest, ProtoSignalRequest, ProtoStartWorkflowRequest,
ProtoUpdateScheduleRequest, SubscriptionRequest, subscription_request,
};
use crate::error::ServerError;
use super::resolver::{CallerIdentity, NamespaceResolver, ScopedEngine};
#[derive(Clone)]
pub struct NamespaceGuard {
resolver: NamespaceResolver,
}
impl NamespaceGuard {
#[must_use]
pub const fn new(resolver: NamespaceResolver) -> Self {
Self { resolver }
}
#[must_use]
pub const fn resolver(&self) -> &NamespaceResolver {
&self.resolver
}
pub async fn scope(
&self,
caller: &CallerIdentity,
operation: &NamespaceOperation<'_>,
) -> Result<ScopedEngine, ServerError> {
let requested_namespace = operation.requested_namespace();
let scoped = self.resolver.resolve(caller, requested_namespace)?;
operation.verify(&self.resolver, scoped.namespace()).await?;
Ok(scoped)
}
}
pub enum NamespaceOperation<'a> {
StartWorkflow(&'a ProtoStartWorkflowRequest),
Signal(&'a ProtoSignalRequest, WorkflowTarget<'a>),
Query(&'a ProtoQueryRequest, WorkflowTarget<'a>),
Cancel(&'a ProtoCancelRequest, WorkflowTarget<'a>),
ListWorkflows(&'a ProtoListWorkflowsRequest, &'a WorkflowFilter),
CountWorkflows(&'a ProtoCountWorkflowsRequest),
Describe(&'a ProtoDescribeWorkflowRequest, WorkflowTarget<'a>),
CreateSchedule(&'a ProtoCreateScheduleRequest),
UpdateSchedule(&'a ProtoUpdateScheduleRequest, ScheduleTarget<'a>),
PauseSchedule(&'a ProtoScheduleIdRequest, ScheduleTarget<'a>),
ResumeSchedule(&'a ProtoScheduleIdRequest, ScheduleTarget<'a>),
DeleteSchedule(&'a ProtoScheduleIdRequest, ScheduleTarget<'a>),
ListSchedules(&'a ProtoListSchedulesRequest),
DescribeSchedule(&'a ProtoScheduleIdRequest, ScheduleTarget<'a>),
Subscribe(SubscriptionScope<'a>, &'a EventFilter),
RegisterWorker(&'a ProtoRegisterWorker),
}
impl<'a> NamespaceOperation<'a> {
#[must_use]
pub const fn start(request: &'a ProtoStartWorkflowRequest) -> Self {
Self::StartWorkflow(request)
}
#[must_use]
pub const fn signal(request: &'a ProtoSignalRequest, target: WorkflowTarget<'a>) -> Self {
Self::Signal(request, target)
}
#[must_use]
pub const fn query(request: &'a ProtoQueryRequest, target: WorkflowTarget<'a>) -> Self {
Self::Query(request, target)
}
#[must_use]
pub const fn cancel(request: &'a ProtoCancelRequest, target: WorkflowTarget<'a>) -> Self {
Self::Cancel(request, target)
}
#[must_use]
pub const fn list(request: &'a ProtoListWorkflowsRequest, filter: &'a WorkflowFilter) -> Self {
Self::ListWorkflows(request, filter)
}
#[must_use]
pub const fn count(request: &'a ProtoCountWorkflowsRequest) -> Self {
Self::CountWorkflows(request)
}
#[must_use]
pub const fn describe(
request: &'a ProtoDescribeWorkflowRequest,
target: WorkflowTarget<'a>,
) -> Self {
Self::Describe(request, target)
}
#[must_use]
pub const fn create_schedule(request: &'a ProtoCreateScheduleRequest) -> Self {
Self::CreateSchedule(request)
}
#[must_use]
pub const fn update_schedule(
request: &'a ProtoUpdateScheduleRequest,
target: ScheduleTarget<'a>,
) -> Self {
Self::UpdateSchedule(request, target)
}
#[must_use]
pub const fn pause_schedule(
request: &'a ProtoScheduleIdRequest,
target: ScheduleTarget<'a>,
) -> Self {
Self::PauseSchedule(request, target)
}
#[must_use]
pub const fn resume_schedule(
request: &'a ProtoScheduleIdRequest,
target: ScheduleTarget<'a>,
) -> Self {
Self::ResumeSchedule(request, target)
}
#[must_use]
pub const fn delete_schedule(
request: &'a ProtoScheduleIdRequest,
target: ScheduleTarget<'a>,
) -> Self {
Self::DeleteSchedule(request, target)
}
#[must_use]
pub const fn list_schedules(request: &'a ProtoListSchedulesRequest) -> Self {
Self::ListSchedules(request)
}
#[must_use]
pub const fn describe_schedule(
request: &'a ProtoScheduleIdRequest,
target: ScheduleTarget<'a>,
) -> Self {
Self::DescribeSchedule(request, target)
}
#[must_use]
pub const fn subscribe(scope: SubscriptionScope<'a>, filter: &'a EventFilter) -> Self {
Self::Subscribe(scope, filter)
}
#[must_use]
pub const fn register_worker(request: &'a ProtoRegisterWorker) -> Self {
Self::RegisterWorker(request)
}
fn requested_namespace(&self) -> &str {
match self {
Self::StartWorkflow(request) => request.namespace.as_str(),
Self::Signal(request, _target) => request.namespace.as_str(),
Self::Query(request, _target) => request.namespace.as_str(),
Self::Cancel(request, _target) => request.namespace.as_str(),
Self::ListWorkflows(request, _filter) => request.namespace.as_str(),
Self::CountWorkflows(request) => request.namespace.as_str(),
Self::Describe(request, _target) => request.namespace.as_str(),
Self::CreateSchedule(request) => request.namespace.as_str(),
Self::UpdateSchedule(request, _target) => request.namespace.as_str(),
Self::PauseSchedule(request, _target)
| Self::ResumeSchedule(request, _target)
| Self::DeleteSchedule(request, _target)
| Self::DescribeSchedule(request, _target) => request.namespace.as_str(),
Self::ListSchedules(request) => request.namespace.as_str(),
Self::Subscribe(scope, _filter) => scope.namespace(),
Self::RegisterWorker(request) => request.namespace.as_str(),
}
}
async fn verify(
&self,
resolver: &NamespaceResolver,
authorized_namespace: &str,
) -> Result<(), ServerError> {
match self {
Self::Signal(_, target)
| Self::Query(_, target)
| Self::Cancel(_, target)
| Self::Describe(_, target) => target.verify(resolver, authorized_namespace).await,
Self::UpdateSchedule(_, target)
| Self::PauseSchedule(_, target)
| Self::ResumeSchedule(_, target)
| Self::DeleteSchedule(_, target)
| Self::DescribeSchedule(_, target) => {
target.verify(resolver, authorized_namespace).await
}
Self::Subscribe(scope, filter) => {
scope.verify(resolver, authorized_namespace, filter).await
}
Self::StartWorkflow(_)
| Self::ListWorkflows(_, _)
| Self::CountWorkflows(_)
| Self::CreateSchedule(_)
| Self::ListSchedules(_)
| Self::RegisterWorker(_) => Ok(()),
}
}
}
#[derive(Clone, Copy)]
pub struct WorkflowTarget<'a> {
workflow_id: &'a WorkflowId,
run_id: Option<&'a RunId>,
}
impl<'a> WorkflowTarget<'a> {
#[must_use]
pub const fn with_run(workflow_id: &'a WorkflowId, run_id: &'a RunId) -> Self {
Self {
workflow_id,
run_id: Some(run_id),
}
}
#[must_use]
pub const fn workflow(workflow_id: &'a WorkflowId) -> Self {
Self {
workflow_id,
run_id: None,
}
}
#[must_use]
pub const fn workflow_id(&self) -> &WorkflowId {
self.workflow_id
}
#[must_use]
pub const fn run_id(&self) -> Option<&RunId> {
self.run_id
}
async fn verify(
&self,
resolver: &NamespaceResolver,
namespace: &str,
) -> Result<(), ServerError> {
resolver
.verify_workflow_ownership(namespace, self.workflow_id)
.await
}
}
#[derive(Clone, Copy)]
pub struct ScheduleTarget<'a> {
schedule_id: &'a ScheduleId,
}
impl<'a> ScheduleTarget<'a> {
#[must_use]
pub const fn schedule(schedule_id: &'a ScheduleId) -> Self {
Self { schedule_id }
}
#[must_use]
pub const fn schedule_id(&self) -> &ScheduleId {
self.schedule_id
}
async fn verify(
&self,
resolver: &NamespaceResolver,
namespace: &str,
) -> Result<(), ServerError> {
resolver
.verify_schedule_ownership(namespace, self.schedule_id)
.await
}
}
pub enum SubscriptionScope<'a> {
PerWorkflow(&'a PerWorkflowSubscription, WorkflowTarget<'a>),
Filtered(&'a FilteredSubscription),
Firehose(&'a FirehoseSubscription),
}
impl<'a> SubscriptionScope<'a> {
pub fn from_request(
request: &'a SubscriptionRequest,
workflow_target: Option<WorkflowTarget<'a>>,
) -> Result<Self, ServerError> {
match &request.subscription {
Some(subscription_request::Subscription::PerWorkflow(subscription)) => {
let target = workflow_target.ok_or_else(|| {
ServerError::namespace_denied(
"per-workflow subscription target must be decoded before guard scope",
)
})?;
Ok(Self::PerWorkflow(subscription, target))
}
Some(subscription_request::Subscription::Filtered(subscription)) => {
Ok(Self::Filtered(subscription))
}
Some(subscription_request::Subscription::Firehose(subscription)) => {
Ok(Self::Firehose(subscription))
}
None => Err(ServerError::namespace_denied(
"subscription request must name a namespace",
)),
}
}
fn namespace(&self) -> &str {
match self {
Self::PerWorkflow(subscription, _target) => subscription.namespace.as_str(),
Self::Filtered(subscription) => subscription.namespace.as_str(),
Self::Firehose(subscription) => subscription.namespace.as_str(),
}
}
async fn verify(
&self,
resolver: &NamespaceResolver,
namespace: &str,
filter: &EventFilter,
) -> Result<(), ServerError> {
match self {
Self::PerWorkflow(_subscription, target) => {
verify_subscription_filter_target(filter, Some(*target), resolver, namespace).await
}
Self::Filtered(subscription) => {
verify_namespace_selector(subscription.namespace_selector.as_deref(), namespace)?;
verify_subscription_filter_target(filter, None, resolver, namespace).await
}
Self::Firehose(_) => {
verify_subscription_filter_target(filter, None, resolver, namespace).await
}
}
}
}
fn verify_namespace_selector(selector: Option<&str>, namespace: &str) -> Result<(), ServerError> {
match selector {
Some(selector) if selector != namespace => Err(ServerError::namespace_denied(
"subscription namespace selector is not authorized",
)),
Some(_) | None => Ok(()),
}
}
async fn verify_subscription_filter_target(
filter: &EventFilter,
explicit_target: Option<WorkflowTarget<'_>>,
resolver: &NamespaceResolver,
namespace: &str,
) -> Result<(), ServerError> {
if let Some(target) = explicit_target {
if filter
.workflow_id
.as_ref()
.is_some_and(|workflow_id| workflow_id != target.workflow_id())
{
return Err(ServerError::namespace_denied(
"subscription filter workflow does not match decoded target",
));
}
target.verify(resolver, namespace).await
} else if let Some(workflow_id) = &filter.workflow_id {
resolver
.verify_workflow_ownership(namespace, workflow_id)
.await
} else {
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use aion_core::{RunId, ScheduleId, WorkflowFilter, WorkflowId};
use aion_proto::{
FilteredSubscription, FirehoseSubscription, PerWorkflowSubscription, ProtoCancelRequest,
ProtoCreateScheduleRequest, ProtoDescribeWorkflowRequest, ProtoListSchedulesRequest,
ProtoListWorkflowsRequest, ProtoQueryRequest, ProtoRegisterWorker, ProtoScheduleIdRequest,
ProtoSignalRequest, ProtoStartWorkflowRequest, ProtoUpdateScheduleRequest,
};
use async_trait::async_trait;
use super::{
NamespaceGuard, NamespaceOperation, ScheduleTarget, SubscriptionScope, WorkflowTarget,
};
use crate::config::NamespaceMode;
use crate::error::ServerError;
use crate::namespace::{
CallerIdentity, NamespaceResolver, ScheduleNamespaceSource, StaticScheduleNamespaces,
StaticWorkflowNamespaces,
};
struct RecordingFakeEngine {
calls: Mutex<Vec<&'static str>>,
}
impl RecordingFakeEngine {
fn new() -> Self {
Self {
calls: Mutex::new(Vec::new()),
}
}
fn calls(&self) -> Result<Vec<&'static str>, Box<dyn std::error::Error>> {
let calls = self
.calls
.lock()
.map_err(|_| "fake engine calls lock poisoned")?;
Ok(calls.clone())
}
}
#[derive(Clone)]
struct CountingScheduleNamespaces {
inner: StaticScheduleNamespaces,
calls: Arc<AtomicUsize>,
}
impl CountingScheduleNamespaces {
fn wrapping(inner: StaticScheduleNamespaces) -> Self {
Self {
inner,
calls: Arc::new(AtomicUsize::new(0)),
}
}
fn calls(&self) -> usize {
self.calls.load(Ordering::SeqCst)
}
}
#[async_trait]
impl ScheduleNamespaceSource for CountingScheduleNamespaces {
async fn schedule_namespace(
&self,
schedule_id: &ScheduleId,
) -> Result<Option<String>, ServerError> {
self.calls.fetch_add(1, Ordering::SeqCst);
self.inner.schedule_namespace(schedule_id).await
}
}
fn guard_with_ownership(ownership: StaticWorkflowNamespaces) -> NamespaceGuard {
let resolver = NamespaceResolver::authorization_only(
NamespaceMode::SharedEngine,
ownership,
StaticScheduleNamespaces::default(),
);
NamespaceGuard::new(resolver)
}
fn guard_with_schedule_ownership(
schedule_ownership: impl ScheduleNamespaceSource + 'static,
) -> NamespaceGuard {
let resolver = NamespaceResolver::authorization_only(
NamespaceMode::SharedEngine,
StaticWorkflowNamespaces::default(),
schedule_ownership,
);
NamespaceGuard::new(resolver)
}
fn caller() -> CallerIdentity {
CallerIdentity::new("alice", [String::from("tenant-a")])
}
fn workflow_ids() -> (WorkflowId, RunId) {
(
WorkflowId::new(uuid::Uuid::from_u128(1)),
RunId::new(uuid::Uuid::from_u128(2)),
)
}
#[tokio::test]
async fn denied_targeted_operations_do_not_call_engine()
-> Result<(), Box<dyn std::error::Error>> {
let (workflow_id, run_id) = workflow_ids();
let ownership = StaticWorkflowNamespaces::default();
ownership.record(workflow_id.clone(), "tenant-b")?;
let guard = guard_with_ownership(ownership);
let fake = RecordingFakeEngine::new();
let target = WorkflowTarget::with_run(&workflow_id, &run_id);
let signal = ProtoSignalRequest {
namespace: String::from("tenant-a"),
workflow_id: None,
run_id: None,
signal_name: String::from("ship"),
payload: None,
};
let query = ProtoQueryRequest {
namespace: String::from("tenant-a"),
workflow_id: None,
run_id: None,
query_name: String::from("state"),
};
let cancel = ProtoCancelRequest {
namespace: String::from("tenant-a"),
workflow_id: None,
run_id: None,
reason: String::from("operator"),
};
let describe = ProtoDescribeWorkflowRequest {
namespace: String::from("tenant-a"),
workflow_id: None,
run_id: None,
include_history: false,
};
let operations = [
NamespaceOperation::signal(&signal, target),
NamespaceOperation::query(&query, target),
NamespaceOperation::cancel(&cancel, target),
NamespaceOperation::describe(&describe, target),
];
for operation in operations {
let result = guard.scope(&caller(), &operation).await;
assert_eq!(
result.err().map(|error| error.to_wire_error().code),
Some(aion_proto::WireErrorCode::NotFound)
);
}
assert!(fake.calls()?.is_empty());
Ok(())
}
#[tokio::test]
async fn denied_list_and_worker_scope_do_not_call_engine()
-> Result<(), Box<dyn std::error::Error>> {
let (workflow_id, _run_id) = workflow_ids();
let ownership = StaticWorkflowNamespaces::default();
ownership.record(workflow_id, "tenant-b")?;
let guard = guard_with_ownership(ownership);
let fake = RecordingFakeEngine::new();
let filter = WorkflowFilter::default();
let list = ProtoListWorkflowsRequest {
namespace: String::from("tenant-b"),
filter: None,
};
let worker = ProtoRegisterWorker {
namespace: String::from("tenant-b"),
activity_types: vec![String::from("ship")],
};
assert!(
guard
.scope(&caller(), &NamespaceOperation::list(&list, &filter))
.await
.is_err()
);
assert!(
guard
.scope(&caller(), &NamespaceOperation::register_worker(&worker),)
.await
.is_err()
);
assert!(fake.calls()?.is_empty());
Ok(())
}
#[tokio::test]
async fn denied_subscriptions_do_not_call_engine() -> Result<(), Box<dyn std::error::Error>> {
let (workflow_id, run_id) = workflow_ids();
let ownership = StaticWorkflowNamespaces::default();
ownership.record(workflow_id.clone(), "tenant-b")?;
let guard = guard_with_ownership(ownership);
let fake = RecordingFakeEngine::new();
let event_filter = aion::EventFilter::default();
let filtered = FilteredSubscription {
namespace: String::from("tenant-a"),
workflow_type: None,
status: None,
namespace_selector: Some(String::from("tenant-b")),
};
let filtered_by_workflow = FilteredSubscription {
namespace: String::from("tenant-a"),
workflow_type: None,
status: None,
namespace_selector: None,
};
let per_workflow = PerWorkflowSubscription {
namespace: String::from("tenant-a"),
workflow_id: None,
resume_from_seq: None,
};
let cross_namespace_filter = aion::EventFilter {
workflow_id: Some(workflow_id.clone()),
run: None,
family: None,
};
let firehose = FirehoseSubscription {
namespace: String::from("tenant-b"),
};
let target = WorkflowTarget::with_run(&workflow_id, &run_id);
let denied_subscriptions = [
(
NamespaceOperation::subscribe(
SubscriptionScope::Filtered(&filtered),
&event_filter,
),
aion_proto::WireErrorCode::NamespaceDenied,
),
(
NamespaceOperation::subscribe(
SubscriptionScope::Filtered(&filtered_by_workflow),
&cross_namespace_filter,
),
aion_proto::WireErrorCode::NotFound,
),
(
NamespaceOperation::subscribe(
SubscriptionScope::PerWorkflow(&per_workflow, target),
&cross_namespace_filter,
),
aion_proto::WireErrorCode::NotFound,
),
(
NamespaceOperation::subscribe(
SubscriptionScope::Firehose(&firehose),
&event_filter,
),
aion_proto::WireErrorCode::NamespaceDenied,
),
];
for (operation, expected_code) in &denied_subscriptions {
assert_eq!(
guard
.scope(&caller(), operation)
.await
.err()
.map(|error| error.to_wire_error().code)
.as_ref(),
Some(expected_code)
);
}
assert!(fake.calls()?.is_empty());
Ok(())
}
fn schedule_id() -> ScheduleId {
ScheduleId::new(uuid::Uuid::from_u128(9))
}
fn schedule_id_request(namespace: &str) -> ProtoScheduleIdRequest {
ProtoScheduleIdRequest {
namespace: namespace.to_owned(),
schedule_id: None,
}
}
#[tokio::test]
async fn schedule_ownership_misses_are_not_found_and_do_not_call_engine()
-> Result<(), Box<dyn std::error::Error>> {
let schedule_id = schedule_id();
let schedule_ownership = StaticScheduleNamespaces::default();
schedule_ownership.record(schedule_id.clone(), "tenant-b")?;
let counting = CountingScheduleNamespaces::wrapping(schedule_ownership);
let guard = guard_with_schedule_ownership(counting.clone());
let target = ScheduleTarget::schedule(&schedule_id);
let update = ProtoUpdateScheduleRequest {
namespace: String::from("tenant-a"),
schedule_id: None,
config: None,
};
let id_request = schedule_id_request("tenant-a");
let operations = [
NamespaceOperation::update_schedule(&update, target),
NamespaceOperation::pause_schedule(&id_request, target),
NamespaceOperation::resume_schedule(&id_request, target),
NamespaceOperation::delete_schedule(&id_request, target),
NamespaceOperation::describe_schedule(&id_request, target),
];
let operation_count = operations.len();
for operation in operations {
let result = guard.scope(&caller(), &operation).await;
let error = result
.err()
.map(|error| error.to_wire_error())
.ok_or("expected foreign-owned schedule to be rejected")?;
assert_eq!(error.code, aion_proto::WireErrorCode::NotFound);
assert_eq!(error.message, "schedule not found in namespace tenant-a");
}
assert_eq!(counting.calls(), operation_count);
Ok(())
}
#[tokio::test]
async fn ungranted_schedule_operations_are_namespace_denied()
-> Result<(), Box<dyn std::error::Error>> {
let schedule_id = schedule_id();
let schedule_ownership = StaticScheduleNamespaces::default();
schedule_ownership.record(schedule_id.clone(), "tenant-b")?;
let counting = CountingScheduleNamespaces::wrapping(schedule_ownership);
let guard = guard_with_schedule_ownership(counting.clone());
let target = ScheduleTarget::schedule(&schedule_id);
let create = ProtoCreateScheduleRequest {
namespace: String::from("tenant-b"),
config: None,
};
let update = ProtoUpdateScheduleRequest {
namespace: String::from("tenant-b"),
schedule_id: None,
config: None,
};
let id_request = schedule_id_request("tenant-b");
let list = ProtoListSchedulesRequest {
namespace: String::from("tenant-b"),
};
let operations = [
NamespaceOperation::create_schedule(&create),
NamespaceOperation::update_schedule(&update, target),
NamespaceOperation::pause_schedule(&id_request, target),
NamespaceOperation::resume_schedule(&id_request, target),
NamespaceOperation::delete_schedule(&id_request, target),
NamespaceOperation::describe_schedule(&id_request, target),
NamespaceOperation::list_schedules(&list),
];
for operation in operations {
let result = guard.scope(&caller(), &operation).await;
assert_eq!(
result.err().map(|error| error.to_wire_error().code),
Some(aion_proto::WireErrorCode::NamespaceDenied)
);
}
assert_eq!(counting.calls(), 0);
Ok(())
}
#[tokio::test]
async fn granted_schedule_create_and_list_return_scoped_engine()
-> Result<(), Box<dyn std::error::Error>> {
let guard = guard_with_schedule_ownership(StaticScheduleNamespaces::default());
let create = ProtoCreateScheduleRequest {
namespace: String::from("tenant-a"),
config: None,
};
let list = ProtoListSchedulesRequest {
namespace: String::from("tenant-a"),
};
let scoped_create = guard
.scope(&caller(), &NamespaceOperation::create_schedule(&create))
.await?;
let scoped_list = guard
.scope(&caller(), &NamespaceOperation::list_schedules(&list))
.await?;
assert_eq!(scoped_create.namespace(), "tenant-a");
assert_eq!(scoped_list.namespace(), "tenant-a");
Ok(())
}
#[tokio::test]
async fn authorized_start_returns_scoped_engine() -> Result<(), Box<dyn std::error::Error>> {
let guard = guard_with_ownership(StaticWorkflowNamespaces::default());
let request = ProtoStartWorkflowRequest {
namespace: String::from("tenant-a"),
workflow_type: String::from("checkout"),
input: None,
};
let scoped = guard
.scope(&caller(), &NamespaceOperation::start(&request))
.await?;
assert_eq!(scoped.namespace(), "tenant-a");
Ok(())
}
#[tokio::test]
async fn single_tenant_mode_authorizes_configured_namespace()
-> Result<(), Box<dyn std::error::Error>> {
let resolver = NamespaceResolver::authorization_only(
NamespaceMode::SingleTenant {
namespace: String::from("tenant-a"),
},
StaticWorkflowNamespaces::default(),
StaticScheduleNamespaces::default(),
);
let guard = NamespaceGuard::new(resolver);
let request = ProtoRegisterWorker {
namespace: String::from("tenant-a"),
activity_types: Vec::new(),
};
let scoped = guard
.scope(
&CallerIdentity::new("single-tenant", Vec::<String>::new()),
&NamespaceOperation::register_worker(&request),
)
.await?;
assert_eq!(scoped.namespace(), "tenant-a");
Ok(())
}
}