use aion_core::{Event, WorkflowStatus};
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct SubscriptionSelector {
pub workflow_type: Option<String>,
pub status: Option<WorkflowStatus>,
}
impl SubscriptionSelector {
#[must_use]
pub const fn unrestricted() -> Self {
Self {
workflow_type: None,
status: None,
}
}
#[must_use]
pub fn matches(&self, event: &Event, workflow_type: Option<&str>) -> bool {
if let Some(selected_type) = &self.workflow_type {
if workflow_type != Some(selected_type.as_str()) {
return false;
}
}
if let Some(selected_status) = self.status {
if event_status(event) != selected_status {
return false;
}
}
true
}
}
const fn event_status(event: &Event) -> WorkflowStatus {
match event {
Event::WorkflowCompleted { .. } => WorkflowStatus::Completed,
Event::WorkflowFailed { .. } => WorkflowStatus::Failed,
Event::WorkflowCancelled { .. } => WorkflowStatus::Cancelled,
Event::WorkflowTimedOut { .. } => WorkflowStatus::TimedOut,
Event::WorkflowContinuedAsNew { .. } => WorkflowStatus::ContinuedAsNew,
Event::WorkflowStarted { .. }
| Event::SearchAttributesUpdated { .. }
| Event::ActivityScheduled { .. }
| Event::ActivityStarted { .. }
| Event::ActivityCompleted { .. }
| Event::ActivityFailed { .. }
| Event::ActivityCancelled { .. }
| Event::TimerStarted { .. }
| Event::TimerFired { .. }
| Event::TimerCancelled { .. }
| Event::WithTimeoutCompleted { .. }
| Event::SignalReceived { .. }
| Event::SignalSent { .. }
| Event::ChildWorkflowStarted { .. }
| Event::ChildWorkflowCompleted { .. }
| Event::ChildWorkflowFailed { .. }
| Event::ChildWorkflowCancelled { .. }
| Event::ScheduleCreated { .. }
| Event::ScheduleUpdated { .. }
| Event::SchedulePaused { .. }
| Event::ScheduleResumed { .. }
| Event::ScheduleDeleted { .. }
| Event::ScheduleTriggered { .. } => WorkflowStatus::Running,
}
}
#[cfg(test)]
mod tests {
use aion_core::{Event, EventEnvelope, Payload, WorkflowId, WorkflowStatus};
use super::SubscriptionSelector;
fn envelope(seq: u64) -> EventEnvelope {
EventEnvelope {
seq,
recorded_at: chrono::Utc::now(),
workflow_id: WorkflowId::new(uuid::Uuid::from_u128(1)),
}
}
fn payload() -> Result<Payload, aion_core::PayloadError> {
Payload::from_json(&serde_json::json!({ "label": "x" }))
}
fn signal(seq: u64) -> Result<Event, aion_core::PayloadError> {
Ok(Event::SignalReceived {
envelope: envelope(seq),
name: "ship".to_owned(),
payload: payload()?,
})
}
fn started(seq: u64) -> Result<Event, aion_core::PayloadError> {
Ok(Event::WorkflowStarted {
envelope: envelope(seq),
workflow_type: "checkout".to_owned(),
input: payload()?,
run_id: aion_core::RunId::new(uuid::Uuid::from_u128(1)),
parent_run_id: None,
package_version: aion_core::PackageVersion::new("a".repeat(64)),
})
}
fn completed(seq: u64) -> Result<Event, aion_core::PayloadError> {
Ok(Event::WorkflowCompleted {
envelope: envelope(seq),
result: payload()?,
})
}
fn failed(seq: u64) -> Event {
Event::WorkflowFailed {
envelope: envelope(seq),
error: aion_core::WorkflowError {
message: "boom".to_owned(),
details: None,
},
}
}
#[test]
fn unrestricted_selector_matches_everything() -> Result<(), Box<dyn std::error::Error>> {
let selector = SubscriptionSelector::unrestricted();
assert!(selector.matches(&signal(1)?, None));
assert!(selector.matches(&completed(2)?, Some("checkout")));
Ok(())
}
#[test]
fn type_selector_matches_only_the_recorded_type() -> Result<(), Box<dyn std::error::Error>> {
let selector = SubscriptionSelector {
workflow_type: Some("checkout".to_owned()),
status: None,
};
assert!(selector.matches(&signal(1)?, Some("checkout")));
assert!(!selector.matches(&signal(1)?, Some("fulfillment")));
assert!(
!selector.matches(&signal(1)?, None),
"a workflow with no recorded type never matches a type selector"
);
Ok(())
}
#[test]
fn status_selector_matches_per_event_kind() -> Result<(), Box<dyn std::error::Error>> {
let running = SubscriptionSelector {
workflow_type: None,
status: Some(WorkflowStatus::Running),
};
let completed_only = SubscriptionSelector {
workflow_type: None,
status: Some(WorkflowStatus::Completed),
};
let failed_only = SubscriptionSelector {
workflow_type: None,
status: Some(WorkflowStatus::Failed),
};
assert!(running.matches(&started(1)?, Some("checkout")));
assert!(running.matches(&signal(2)?, Some("checkout")));
assert!(!running.matches(&completed(3)?, Some("checkout")));
assert!(completed_only.matches(&completed(3)?, Some("checkout")));
assert!(!completed_only.matches(&failed(3), Some("checkout")));
assert!(!completed_only.matches(&signal(2)?, Some("checkout")));
assert!(failed_only.matches(&failed(3), Some("checkout")));
assert!(!failed_only.matches(&completed(3)?, Some("checkout")));
Ok(())
}
#[test]
fn combined_selectors_and_together() -> Result<(), Box<dyn std::error::Error>> {
let selector = SubscriptionSelector {
workflow_type: Some("checkout".to_owned()),
status: Some(WorkflowStatus::Completed),
};
assert!(selector.matches(&completed(3)?, Some("checkout")));
assert!(
!selector.matches(&completed(3)?, Some("fulfillment")),
"matching status with mismatched type must not pass"
);
assert!(
!selector.matches(&signal(2)?, Some("checkout")),
"matching type with mismatched status must not pass"
);
Ok(())
}
}