use std::sync::Arc;
use aion_store::StoreError;
use aion_store::visibility::{ListWorkflowsFilter, VisibilityStore, WorkflowSummary};
const INTERNAL_WORKFLOW_TYPES: &[&str] = &["aion.schedule_coordinator"];
pub(crate) fn is_internal_workflow_type(workflow_type: &str) -> bool {
INTERNAL_WORKFLOW_TYPES.contains(&workflow_type)
}
pub(crate) fn retain_user_workflows(summaries: &mut Vec<WorkflowSummary>) {
summaries.retain(|summary| !is_internal_workflow_type(&summary.workflow_type));
}
pub(crate) async fn count_user_workflows(
store: &Arc<dyn VisibilityStore>,
filter: ListWorkflowsFilter,
) -> Result<u64, StoreError> {
if let Some(workflow_type) = &filter.workflow_type {
if is_internal_workflow_type(workflow_type) {
return Ok(0);
}
return store.count_workflows(filter).await;
}
let mut count = store.count_workflows(filter.clone()).await?;
for internal_type in INTERNAL_WORKFLOW_TYPES {
let mut internal_filter = filter.clone();
internal_filter.workflow_type = Some((*internal_type).to_owned());
count = count.saturating_sub(store.count_workflows(internal_filter).await?);
}
Ok(count)
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use aion_core::{RunId, WorkflowId, WorkflowStatus};
use aion_store::InMemoryStore;
use aion_store::visibility::VisibilityRecord;
use chrono::Utc;
use super::*;
#[test]
fn predicate_matches_only_internal_types() {
assert!(is_internal_workflow_type("aion.schedule_coordinator"));
assert!(!is_internal_workflow_type("checkout"));
assert!(!is_internal_workflow_type("schedule_coordinator"));
}
fn record(workflow_type: &str, id: u128) -> VisibilityRecord {
VisibilityRecord {
workflow_id: WorkflowId::new(uuid::Uuid::from_u128(id)),
run_id: RunId::new(uuid::Uuid::from_u128(id + 100)),
workflow_type: workflow_type.to_owned(),
status: WorkflowStatus::Running,
start_time: Utc::now(),
close_time: None,
search_attributes: HashMap::new(),
}
}
#[tokio::test]
async fn count_excludes_internal_workflows() -> Result<(), Box<dyn std::error::Error>> {
let store: Arc<dyn VisibilityStore> = Arc::new(InMemoryStore::default());
store.record_visibility(record("checkout", 1)).await?;
store
.record_visibility(record("aion.schedule_coordinator", 2))
.await?;
let unfiltered = count_user_workflows(&store, ListWorkflowsFilter::default()).await?;
assert_eq!(unfiltered, 1);
let user_typed = count_user_workflows(
&store,
ListWorkflowsFilter {
workflow_type: Some(String::from("checkout")),
..ListWorkflowsFilter::default()
},
)
.await?;
assert_eq!(user_typed, 1);
let internal_typed = count_user_workflows(
&store,
ListWorkflowsFilter {
workflow_type: Some(String::from("aion.schedule_coordinator")),
..ListWorkflowsFilter::default()
},
)
.await?;
assert_eq!(internal_typed, 0);
Ok(())
}
#[test]
fn retain_drops_internal_summaries() {
let mut summaries = vec![
WorkflowSummary::from(record("checkout", 1)),
WorkflowSummary::from(record("aion.schedule_coordinator", 2)),
];
retain_user_workflows(&mut summaries);
assert_eq!(summaries.len(), 1);
assert_eq!(summaries[0].workflow_type, "checkout");
}
}