1use std::collections::HashMap;
4
5use aion_core::{RunId, SearchAttributeValue, WorkflowId, WorkflowStatus};
6use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9
10use crate::StoreError;
11
12#[async_trait]
14pub trait VisibilityStore: Send + Sync + 'static {
15 async fn record_visibility(&self, record: VisibilityRecord) -> Result<(), StoreError>;
24
25 async fn list_workflows(
34 &self,
35 filter: ListWorkflowsFilter,
36 ) -> Result<Vec<WorkflowSummary>, StoreError>;
37
38 async fn count_workflows(&self, filter: ListWorkflowsFilter) -> Result<u64, StoreError>;
44}
45
46#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
48pub struct VisibilityRecord {
49 pub workflow_id: WorkflowId,
51 pub run_id: RunId,
53 pub workflow_type: String,
55 pub status: WorkflowStatus,
57 pub start_time: DateTime<Utc>,
59 pub close_time: Option<DateTime<Utc>>,
61 pub search_attributes: HashMap<String, SearchAttributeValue>,
63}
64
65impl From<VisibilityRecord> for WorkflowSummary {
66 fn from(record: VisibilityRecord) -> Self {
67 Self {
68 workflow_id: record.workflow_id,
69 run_id: record.run_id,
70 workflow_type: record.workflow_type,
71 status: record.status,
72 start_time: record.start_time,
73 close_time: record.close_time,
74 search_attributes: record.search_attributes,
75 }
76 }
77}
78
79#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
81pub struct WorkflowSummary {
82 pub workflow_id: WorkflowId,
84 pub run_id: RunId,
86 pub workflow_type: String,
88 pub status: WorkflowStatus,
90 pub start_time: DateTime<Utc>,
92 pub close_time: Option<DateTime<Utc>>,
94 pub search_attributes: HashMap<String, SearchAttributeValue>,
96}
97
98#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq)]
104pub struct ListWorkflowsFilter {
105 pub workflow_type: Option<String>,
107 pub status: Option<WorkflowStatus>,
109 pub started_after: Option<DateTime<Utc>>,
111 pub started_before: Option<DateTime<Utc>>,
113 pub closed_after: Option<DateTime<Utc>>,
115 pub closed_before: Option<DateTime<Utc>>,
117 pub search_attributes: Vec<SearchAttributePredicate>,
119 pub limit: Option<u32>,
121 pub offset: Option<u32>,
123}
124
125impl ListWorkflowsFilter {
126 #[must_use]
131 pub fn matches(&self, summary: &WorkflowSummary) -> bool {
132 self.matches_workflow_type(summary)
133 && self.matches_status(summary)
134 && self.matches_started_after(summary)
135 && self.matches_started_before(summary)
136 && self.matches_closed_after(summary)
137 && self.matches_closed_before(summary)
138 && self.matches_search_attributes(summary)
139 }
140
141 fn matches_workflow_type(&self, summary: &WorkflowSummary) -> bool {
142 self.workflow_type
143 .as_ref()
144 .is_none_or(|workflow_type| workflow_type == &summary.workflow_type)
145 }
146
147 fn matches_status(&self, summary: &WorkflowSummary) -> bool {
148 self.status.is_none_or(|status| status == summary.status)
149 }
150
151 fn matches_started_after(&self, summary: &WorkflowSummary) -> bool {
152 self.started_after
153 .is_none_or(|started_after| summary.start_time >= started_after)
154 }
155
156 fn matches_started_before(&self, summary: &WorkflowSummary) -> bool {
157 self.started_before
158 .is_none_or(|started_before| summary.start_time <= started_before)
159 }
160
161 fn matches_closed_after(&self, summary: &WorkflowSummary) -> bool {
162 self.closed_after.is_none_or(|closed_after| {
163 summary
164 .close_time
165 .is_some_and(|close_time| close_time >= closed_after)
166 })
167 }
168
169 fn matches_closed_before(&self, summary: &WorkflowSummary) -> bool {
170 self.closed_before.is_none_or(|closed_before| {
171 summary
172 .close_time
173 .is_some_and(|close_time| close_time <= closed_before)
174 })
175 }
176
177 fn matches_search_attributes(&self, summary: &WorkflowSummary) -> bool {
178 self.search_attributes
179 .iter()
180 .all(|predicate| predicate.matches(summary))
181 }
182}
183
184#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
186pub enum SearchAttributePredicate {
187 Equals {
189 name: String,
191 value: SearchAttributeValue,
193 },
194 GreaterThan {
196 name: String,
198 value: SearchAttributeValue,
200 },
201 LessThan {
203 name: String,
205 value: SearchAttributeValue,
207 },
208 Contains {
210 name: String,
212 keyword: String,
214 },
215}
216
217impl SearchAttributePredicate {
218 #[must_use]
224 pub fn matches(&self, summary: &WorkflowSummary) -> bool {
225 match self {
226 Self::Equals { name, value } => summary
227 .search_attributes
228 .get(name)
229 .is_some_and(|stored| stored == value),
230 Self::GreaterThan { name, value } => summary
231 .search_attributes
232 .get(name)
233 .is_some_and(|stored| stored_greater_than(stored, value)),
234 Self::LessThan { name, value } => summary
235 .search_attributes
236 .get(name)
237 .is_some_and(|stored| stored_less_than(stored, value)),
238 Self::Contains { name, keyword } => {
239 summary
240 .search_attributes
241 .get(name)
242 .is_some_and(|stored| match stored {
243 SearchAttributeValue::KeywordList(keywords) => keywords.contains(keyword),
244 _ => false,
245 })
246 }
247 }
248 }
249}
250
251#[must_use]
252fn stored_greater_than(stored: &SearchAttributeValue, value: &SearchAttributeValue) -> bool {
253 match (stored, value) {
254 (SearchAttributeValue::Int(stored), SearchAttributeValue::Int(value)) => stored > value,
255 (SearchAttributeValue::Float(stored), SearchAttributeValue::Float(value)) => stored > value,
256 (SearchAttributeValue::Datetime(stored), SearchAttributeValue::Datetime(value)) => {
257 stored > value
258 }
259 _ => false,
260 }
261}
262
263#[must_use]
264fn stored_less_than(stored: &SearchAttributeValue, value: &SearchAttributeValue) -> bool {
265 match (stored, value) {
266 (SearchAttributeValue::Int(stored), SearchAttributeValue::Int(value)) => stored < value,
267 (SearchAttributeValue::Float(stored), SearchAttributeValue::Float(value)) => stored < value,
268 (SearchAttributeValue::Datetime(stored), SearchAttributeValue::Datetime(value)) => {
269 stored < value
270 }
271 _ => false,
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use std::{collections::HashMap, sync::Arc};
278
279 use aion_core::{RunId, SearchAttributeValue, WorkflowId, WorkflowStatus};
280 use chrono::{DateTime, Utc};
281
282 use super::{ListWorkflowsFilter, SearchAttributePredicate, VisibilityStore, WorkflowSummary};
283
284 #[test]
285 fn visibility_store_is_object_safe() {
286 let _: Option<Arc<dyn VisibilityStore>> = None;
287 }
288
289 #[test]
290 fn default_filter_matches_all_workflows() {
291 let summary = workflow_summary();
292
293 assert!(ListWorkflowsFilter::default().matches(&summary));
294 }
295
296 #[test]
297 fn mismatched_typed_search_attribute_predicates_do_not_match() {
298 let summary = workflow_summary();
299
300 let greater_than_string = SearchAttributePredicate::GreaterThan {
301 name: String::from("customer"),
302 value: SearchAttributeValue::String(String::from("a")),
303 };
304 let contains_non_keyword_list = SearchAttributePredicate::Contains {
305 name: String::from("attempts"),
306 keyword: String::from("vip"),
307 };
308
309 assert!(
310 !ListWorkflowsFilter {
311 search_attributes: vec![greater_than_string],
312 ..ListWorkflowsFilter::default()
313 }
314 .matches(&summary)
315 );
316 assert!(
317 !ListWorkflowsFilter {
318 search_attributes: vec![contains_non_keyword_list],
319 ..ListWorkflowsFilter::default()
320 }
321 .matches(&summary)
322 );
323 }
324
325 #[test]
326 fn search_attribute_predicates_match_supported_typed_operations() {
327 let summary = workflow_summary();
328
329 assert!(
330 SearchAttributePredicate::Equals {
331 name: String::from("customer"),
332 value: SearchAttributeValue::String(String::from("cust-1")),
333 }
334 .matches(&summary)
335 );
336 assert!(
337 SearchAttributePredicate::GreaterThan {
338 name: String::from("attempts"),
339 value: SearchAttributeValue::Int(2),
340 }
341 .matches(&summary)
342 );
343 assert!(
344 SearchAttributePredicate::LessThan {
345 name: String::from("attempts"),
346 value: SearchAttributeValue::Int(4),
347 }
348 .matches(&summary)
349 );
350 assert!(
351 SearchAttributePredicate::Contains {
352 name: String::from("tags"),
353 keyword: String::from("vip"),
354 }
355 .matches(&summary)
356 );
357 }
358
359 fn workflow_summary() -> WorkflowSummary {
360 let mut search_attributes = HashMap::new();
361 search_attributes.insert(
362 String::from("customer"),
363 SearchAttributeValue::String(String::from("cust-1")),
364 );
365 search_attributes.insert(String::from("attempts"), SearchAttributeValue::Int(3));
366 search_attributes.insert(
367 String::from("tags"),
368 SearchAttributeValue::KeywordList(vec![String::from("vip"), String::from("west")]),
369 );
370
371 WorkflowSummary {
372 workflow_id: WorkflowId::new_v4(),
373 run_id: RunId::new_v4(),
374 workflow_type: String::from("example"),
375 status: WorkflowStatus::Running,
376 start_time: DateTime::<Utc>::default(),
377 close_time: None,
378 search_attributes,
379 }
380 }
381}