use aion::EventFilter;
use aion_core::WorkflowId;
use aion_proto::{ProtoWorkflowId, ProtoWorkflowStatus, SubscriptionRequest, subscription_request};
use futures::stream::BoxStream;
use crate::error::ServerError;
use crate::namespace::{
CallerIdentity, NamespaceGuard, NamespaceOperation, SubscriptionScope, WorkflowTarget,
};
use crate::stream::selector::SubscriptionSelector;
pub struct EventSubscription {
pub namespace: String,
pub filter: EventFilter,
pub selector: SubscriptionSelector,
pub workflow_target: Option<WorkflowId>,
pub replay: Vec<aion_core::Event>,
pub events: BoxStream<'static, Result<aion_core::Event, aion::EventStreamLagged>>,
}
pub async fn subscribe_events(
guard: &NamespaceGuard,
caller: &CallerIdentity,
request: &SubscriptionRequest,
) -> Result<EventSubscription, ServerError> {
let mapped = map_subscription_request(request)?;
let target = mapped
.workflow_target
.as_ref()
.map(WorkflowTarget::workflow);
let scope = SubscriptionScope::from_request(request, target)?;
let operation = NamespaceOperation::subscribe(scope, &mapped.filter);
let scoped = guard.scope(caller, &operation).await?;
let engine = scoped.engine()?;
let live = engine.subscribe(mapped.filter.clone());
let (replay, events) = match (mapped.workflow_target.as_ref(), mapped.resume_from) {
(Some(workflow_id), Some(resume_from_seq)) => {
let history = engine.store().read_history(workflow_id).await?;
super::resume::splice(live, history, resume_from_seq)?
}
_ => (Vec::new(), live),
};
Ok(EventSubscription {
namespace: scoped.namespace().to_owned(),
filter: mapped.filter,
selector: mapped.selector,
workflow_target: mapped.workflow_target,
replay,
events,
})
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct MappedSubscription {
pub filter: EventFilter,
pub selector: SubscriptionSelector,
pub workflow_target: Option<WorkflowId>,
pub resume_from: Option<u64>,
}
pub fn map_subscription_request(
request: &SubscriptionRequest,
) -> Result<MappedSubscription, ServerError> {
match &request.subscription {
Some(subscription_request::Subscription::PerWorkflow(subscription)) => {
let workflow_id = decode_workflow_id(subscription.workflow_id.clone())?;
Ok(MappedSubscription {
filter: EventFilter {
workflow_id: Some(workflow_id.clone()),
..EventFilter::default()
},
selector: SubscriptionSelector::unrestricted(),
workflow_target: Some(workflow_id),
resume_from: subscription.resume_from_seq,
})
}
Some(subscription_request::Subscription::Filtered(subscription)) => {
let status = decode_status(subscription.status)?;
Ok(MappedSubscription {
filter: EventFilter::default(),
selector: SubscriptionSelector {
workflow_type: subscription.workflow_type.clone(),
status,
},
workflow_target: None,
resume_from: None,
})
}
Some(subscription_request::Subscription::Firehose(_subscription)) => {
Ok(MappedSubscription {
filter: EventFilter::default(),
selector: SubscriptionSelector::unrestricted(),
workflow_target: None,
resume_from: None,
})
}
None => Err(ServerError::Wire {
wire: aion_proto::WireError::backend("subscription variant is missing"),
}),
}
}
fn decode_workflow_id(workflow_id: Option<ProtoWorkflowId>) -> Result<WorkflowId, ServerError> {
workflow_id
.ok_or_else(|| ServerError::Wire {
wire: aion_proto::WireError::backend("per-workflow subscription id is missing"),
})?
.try_into()
.map_err(|wire| ServerError::Wire { wire })
}
fn decode_status(status: Option<i32>) -> Result<Option<aion_core::WorkflowStatus>, ServerError> {
let Some(status) = status else {
return Ok(None);
};
let proto = ProtoWorkflowStatus::try_from(status).map_err(|_| ServerError::Wire {
wire: aion_proto::WireError::backend("workflow status is invalid"),
})?;
let status =
aion_core::WorkflowStatus::try_from(proto).map_err(|wire| ServerError::Wire { wire })?;
Ok(Some(status))
}
#[cfg(test)]
mod tests {
use aion_core::WorkflowId;
use aion_proto::{
FilteredSubscription, FirehoseSubscription, PerWorkflowSubscription, ProtoWorkflowId,
SubscriptionRequest, subscription_request,
};
use super::map_subscription_request;
use crate::config::NamespaceMode;
use crate::namespace::{
CallerIdentity, NamespaceGuard, NamespaceResolver, StaticScheduleNamespaces,
StaticWorkflowNamespaces,
};
fn workflow_id() -> WorkflowId {
WorkflowId::new_v4()
}
fn per_workflow_request(workflow_id: WorkflowId, namespace: &str) -> SubscriptionRequest {
per_workflow_resume_request(workflow_id, namespace, None)
}
fn per_workflow_resume_request(
workflow_id: WorkflowId,
namespace: &str,
resume_from_seq: Option<u64>,
) -> SubscriptionRequest {
SubscriptionRequest {
subscription: Some(subscription_request::Subscription::PerWorkflow(
PerWorkflowSubscription {
namespace: namespace.to_owned(),
workflow_id: Some(ProtoWorkflowId::from(workflow_id)),
resume_from_seq,
},
)),
}
}
fn filtered_request(namespace: &str, selector: Option<&str>) -> SubscriptionRequest {
SubscriptionRequest {
subscription: Some(subscription_request::Subscription::Filtered(
FilteredSubscription {
namespace: namespace.to_owned(),
workflow_type: Some("checkout".to_owned()),
status: Some(aion_proto::ProtoWorkflowStatus::Running as i32),
namespace_selector: selector.map(str::to_owned),
},
)),
}
}
fn firehose_request(namespace: &str) -> SubscriptionRequest {
SubscriptionRequest {
subscription: Some(subscription_request::Subscription::Firehose(
FirehoseSubscription {
namespace: namespace.to_owned(),
},
)),
}
}
fn guard() -> NamespaceGuard {
NamespaceGuard::new(NamespaceResolver::authorization_only(
NamespaceMode::SharedEngine,
StaticWorkflowNamespaces::default(),
StaticScheduleNamespaces::default(),
))
}
fn caller() -> CallerIdentity {
CallerIdentity::new("alice", ["tenant-a".to_owned()])
}
#[test]
fn maps_per_workflow_subscription_to_workflow_filter() -> Result<(), Box<dyn std::error::Error>>
{
let workflow_id = workflow_id();
let request = per_workflow_request(workflow_id.clone(), "tenant-a");
let mapped = map_subscription_request(&request)?;
assert_eq!(mapped.filter.workflow_id, Some(workflow_id.clone()));
assert_eq!(mapped.workflow_target, Some(workflow_id));
assert!(mapped.filter.run.is_none());
assert!(mapped.filter.family.is_none());
Ok(())
}
#[test]
fn maps_filtered_subscription_selectors_into_the_server_side_selector()
-> Result<(), Box<dyn std::error::Error>> {
let mapped = map_subscription_request(&filtered_request("tenant-a", Some("tenant-a")))?;
assert_eq!(mapped.filter, aion::EventFilter::default());
assert!(mapped.workflow_target.is_none());
assert_eq!(
mapped.selector,
crate::stream::selector::SubscriptionSelector {
workflow_type: Some("checkout".to_owned()),
status: Some(aion_core::WorkflowStatus::Running),
}
);
Ok(())
}
#[test]
fn maps_firehose_subscription_to_engine_firehose_filter()
-> Result<(), Box<dyn std::error::Error>> {
let mapped = map_subscription_request(&firehose_request("tenant-a"))?;
assert_eq!(mapped.filter, aion::EventFilter::default());
assert!(mapped.workflow_target.is_none());
assert_eq!(
mapped.selector,
crate::stream::selector::SubscriptionSelector::unrestricted()
);
Ok(())
}
#[tokio::test]
async fn cross_namespace_subscription_selector_is_denied_before_engine_access()
-> Result<(), Box<dyn std::error::Error>> {
let request = filtered_request("tenant-a", Some("tenant-b"));
let mapped = map_subscription_request(&request)?;
let scope = crate::namespace::SubscriptionScope::from_request(&request, None)?;
let operation = crate::namespace::NamespaceOperation::subscribe(scope, &mapped.filter);
let error = guard().scope(&caller(), &operation).await.err();
assert!(matches!(error, Some(crate::ServerError::Namespace { .. })));
Ok(())
}
#[tokio::test]
async fn cross_namespace_firehose_is_denied_before_engine_access()
-> Result<(), Box<dyn std::error::Error>> {
let request = firehose_request("tenant-b");
let mapped = map_subscription_request(&request)?;
let scope = crate::namespace::SubscriptionScope::from_request(&request, None)?;
let operation = crate::namespace::NamespaceOperation::subscribe(scope, &mapped.filter);
let error = guard().scope(&caller(), &operation).await.err();
assert!(matches!(error, Some(crate::ServerError::Namespace { .. })));
Ok(())
}
#[test]
fn resume_cursor_is_carried_for_per_workflow_subscriptions_only()
-> Result<(), Box<dyn std::error::Error>> {
let with_cursor = map_subscription_request(&per_workflow_resume_request(
workflow_id(),
"tenant-a",
Some(42),
))?;
let without_cursor =
map_subscription_request(&per_workflow_request(workflow_id(), "tenant-a"))?;
let filtered = map_subscription_request(&filtered_request("tenant-a", None))?;
let firehose = map_subscription_request(&firehose_request("tenant-a"))?;
assert_eq!(with_cursor.resume_from, Some(42));
assert_eq!(without_cursor.resume_from, None);
assert_eq!(filtered.resume_from, None, "filtered streams are live-only");
assert_eq!(firehose.resume_from, None, "firehose streams are live-only");
Ok(())
}
#[tokio::test]
async fn guard_verdict_precedes_cursor_validation_for_foreign_workflows()
-> Result<(), Box<dyn std::error::Error>> {
let foreign_workflow = workflow_id();
let ownership = crate::namespace::StaticWorkflowNamespaces::default();
ownership.record(foreign_workflow.clone(), "tenant-b")?;
let guard = NamespaceGuard::new(NamespaceResolver::authorization_only(
NamespaceMode::SharedEngine,
ownership,
StaticScheduleNamespaces::default(),
));
let caller = caller();
let mut wire_errors = Vec::new();
for cursor in [Some(0), Some(1), Some(u64::MAX), None] {
let request = per_workflow_resume_request(foreign_workflow.clone(), "tenant-a", cursor);
let error = super::subscribe_events(&guard, &caller, &request)
.await
.err()
.map(|error| error.to_wire_error())
.ok_or_else(|| format!("foreign probe with cursor {cursor:?} must be rejected"))?;
wire_errors.push(error);
}
let absent = super::subscribe_events(
&guard,
&caller,
&per_workflow_resume_request(WorkflowId::new_v4(), "tenant-a", Some(u64::MAX)),
)
.await
.err()
.map(|error| error.to_wire_error())
.ok_or("nonexistent-workflow probe must be rejected")?;
assert_eq!(absent.code, aion_proto::WireErrorCode::NotFound);
for error in &wire_errors {
assert_eq!(
error, &absent,
"every foreign probe must be byte-identical to the nonexistent probe"
);
}
Ok(())
}
}