Skip to main content

aion_store/
visibility.rs

1//! Visibility-store contracts and query projection types.
2
3use std::collections::HashMap;
4
5use aion_core::{RunId, SearchAttributeValue, WorkflowId, WorkflowStatus};
6use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9
10use crate::StoreError;
11
12/// Durable visibility-query contract for workflow visibility backends.
13#[async_trait]
14pub trait VisibilityStore: Send + Sync + 'static {
15    /// Upserts a workflow visibility record.
16    ///
17    /// Implementations should replace the visibility row for the workflow execution identified by
18    /// `record.workflow_id` and `record.run_id`, preserving backend-specific atomicity guarantees.
19    ///
20    /// # Errors
21    ///
22    /// Returns backend or serialization errors encountered while recording the visibility entry.
23    async fn record_visibility(&self, record: VisibilityRecord) -> Result<(), StoreError>;
24
25    /// Lists workflow visibility summaries matching `filter`.
26    ///
27    /// A default filter has every predicate unset and matches all workflow summaries. Pagination is
28    /// applied by backends after filtering and deterministic ordering.
29    ///
30    /// # Errors
31    ///
32    /// Returns backend or serialization errors encountered while querying visibility summaries.
33    async fn list_workflows(
34        &self,
35        filter: ListWorkflowsFilter,
36    ) -> Result<Vec<WorkflowSummary>, StoreError>;
37
38    /// Counts workflow visibility summaries matching `filter`.
39    ///
40    /// # Errors
41    ///
42    /// Returns backend or serialization errors encountered while counting visibility summaries.
43    async fn count_workflows(&self, filter: ListWorkflowsFilter) -> Result<u64, StoreError>;
44}
45
46/// Complete row payload needed to upsert a workflow visibility entry.
47#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
48pub struct VisibilityRecord {
49    /// Logical workflow identifier.
50    pub workflow_id: WorkflowId,
51    /// Concrete run identifier for this workflow execution.
52    pub run_id: RunId,
53    /// Workflow type recorded when the execution started.
54    pub workflow_type: String,
55    /// Current workflow status for visibility queries.
56    pub status: WorkflowStatus,
57    /// Timestamp recorded when the workflow execution started.
58    pub start_time: DateTime<Utc>,
59    /// Timestamp recorded when the workflow execution closed, if terminal.
60    pub close_time: Option<DateTime<Utc>>,
61    /// Typed custom search attributes indexed for visibility queries.
62    pub search_attributes: HashMap<String, SearchAttributeValue>,
63}
64
65impl From<VisibilityRecord> for WorkflowSummary {
66    fn from(record: VisibilityRecord) -> Self {
67        Self {
68            workflow_id: record.workflow_id,
69            run_id: record.run_id,
70            workflow_type: record.workflow_type,
71            status: record.status,
72            start_time: record.start_time,
73            close_time: record.close_time,
74            search_attributes: record.search_attributes,
75        }
76    }
77}
78
79/// Lightweight workflow visibility projection returned by visibility queries.
80#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
81pub struct WorkflowSummary {
82    /// Logical workflow identifier.
83    pub workflow_id: WorkflowId,
84    /// Concrete run identifier for this workflow execution.
85    pub run_id: RunId,
86    /// Workflow type recorded when the execution started.
87    pub workflow_type: String,
88    /// Current workflow status for visibility queries.
89    pub status: WorkflowStatus,
90    /// Timestamp recorded when the workflow execution started.
91    pub start_time: DateTime<Utc>,
92    /// Timestamp recorded when the workflow execution closed, if terminal.
93    pub close_time: Option<DateTime<Utc>>,
94    /// Typed custom search attributes indexed for visibility queries.
95    pub search_attributes: HashMap<String, SearchAttributeValue>,
96}
97
98/// Query input for listing and counting workflow visibility summaries.
99///
100/// A default filter has every scalar predicate unset, no search-attribute predicates, and no
101/// pagination, so it matches all workflow summaries. `closed_after` and `closed_before` only match
102/// summaries with a `close_time` value.
103#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq)]
104pub struct ListWorkflowsFilter {
105    /// Match workflows with this workflow type exactly.
106    pub workflow_type: Option<String>,
107    /// Match workflows whose visibility status equals this status.
108    pub status: Option<WorkflowStatus>,
109    /// Match workflows started at or after this timestamp.
110    pub started_after: Option<DateTime<Utc>>,
111    /// Match workflows started at or before this timestamp.
112    pub started_before: Option<DateTime<Utc>>,
113    /// Match workflows closed at or after this timestamp; running workflows do not match.
114    pub closed_after: Option<DateTime<Utc>>,
115    /// Match workflows closed at or before this timestamp; running workflows do not match.
116    pub closed_before: Option<DateTime<Utc>>,
117    /// Typed custom search-attribute predicates that must all match.
118    pub search_attributes: Vec<SearchAttributePredicate>,
119    /// Maximum number of summaries to return after filtering and ordering.
120    pub limit: Option<u32>,
121    /// Number of summaries to skip after filtering and ordering.
122    pub offset: Option<u32>,
123}
124
125impl ListWorkflowsFilter {
126    /// Returns whether a visibility summary satisfies all predicates in this filter.
127    ///
128    /// Pagination fields are intentionally ignored here because pagination applies after filtering
129    /// and deterministic ordering in a backend.
130    #[must_use]
131    pub fn matches(&self, summary: &WorkflowSummary) -> bool {
132        self.matches_workflow_type(summary)
133            && self.matches_status(summary)
134            && self.matches_started_after(summary)
135            && self.matches_started_before(summary)
136            && self.matches_closed_after(summary)
137            && self.matches_closed_before(summary)
138            && self.matches_search_attributes(summary)
139    }
140
141    fn matches_workflow_type(&self, summary: &WorkflowSummary) -> bool {
142        self.workflow_type
143            .as_ref()
144            .is_none_or(|workflow_type| workflow_type == &summary.workflow_type)
145    }
146
147    fn matches_status(&self, summary: &WorkflowSummary) -> bool {
148        self.status.is_none_or(|status| status == summary.status)
149    }
150
151    fn matches_started_after(&self, summary: &WorkflowSummary) -> bool {
152        self.started_after
153            .is_none_or(|started_after| summary.start_time >= started_after)
154    }
155
156    fn matches_started_before(&self, summary: &WorkflowSummary) -> bool {
157        self.started_before
158            .is_none_or(|started_before| summary.start_time <= started_before)
159    }
160
161    fn matches_closed_after(&self, summary: &WorkflowSummary) -> bool {
162        self.closed_after.is_none_or(|closed_after| {
163            summary
164                .close_time
165                .is_some_and(|close_time| close_time >= closed_after)
166        })
167    }
168
169    fn matches_closed_before(&self, summary: &WorkflowSummary) -> bool {
170        self.closed_before.is_none_or(|closed_before| {
171            summary
172                .close_time
173                .is_some_and(|close_time| close_time <= closed_before)
174        })
175    }
176
177    fn matches_search_attributes(&self, summary: &WorkflowSummary) -> bool {
178        self.search_attributes
179            .iter()
180            .all(|predicate| predicate.matches(summary))
181    }
182}
183
184/// Typed predicate over one custom search attribute.
185#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
186pub enum SearchAttributePredicate {
187    /// Match when the stored attribute value equals `value` exactly.
188    Equals {
189        /// Search attribute name.
190        name: String,
191        /// Expected typed value.
192        value: SearchAttributeValue,
193    },
194    /// Match when the stored ordered attribute value is greater than `value`.
195    GreaterThan {
196        /// Search attribute name.
197        name: String,
198        /// Exclusive lower bound for the stored typed value.
199        value: SearchAttributeValue,
200    },
201    /// Match when the stored ordered attribute value is less than `value`.
202    LessThan {
203        /// Search attribute name.
204        name: String,
205        /// Exclusive upper bound for the stored typed value.
206        value: SearchAttributeValue,
207    },
208    /// Match when the stored attribute is a keyword list containing `keyword`.
209    Contains {
210        /// Search attribute name.
211        name: String,
212        /// Keyword expected to be present in the stored keyword list.
213        keyword: String,
214    },
215}
216
217impl SearchAttributePredicate {
218    /// Returns whether this predicate matches the corresponding attribute on `summary`.
219    ///
220    /// Missing attributes and mismatched typed comparisons do not match. Greater-than and less-than
221    /// comparisons are supported for integer, float, and datetime values. `Contains` matches only
222    /// stored [`SearchAttributeValue::KeywordList`] attributes.
223    #[must_use]
224    pub fn matches(&self, summary: &WorkflowSummary) -> bool {
225        match self {
226            Self::Equals { name, value } => summary
227                .search_attributes
228                .get(name)
229                .is_some_and(|stored| stored == value),
230            Self::GreaterThan { name, value } => summary
231                .search_attributes
232                .get(name)
233                .is_some_and(|stored| stored_greater_than(stored, value)),
234            Self::LessThan { name, value } => summary
235                .search_attributes
236                .get(name)
237                .is_some_and(|stored| stored_less_than(stored, value)),
238            Self::Contains { name, keyword } => {
239                summary
240                    .search_attributes
241                    .get(name)
242                    .is_some_and(|stored| match stored {
243                        SearchAttributeValue::KeywordList(keywords) => keywords.contains(keyword),
244                        _ => false,
245                    })
246            }
247        }
248    }
249}
250
251#[must_use]
252fn stored_greater_than(stored: &SearchAttributeValue, value: &SearchAttributeValue) -> bool {
253    match (stored, value) {
254        (SearchAttributeValue::Int(stored), SearchAttributeValue::Int(value)) => stored > value,
255        (SearchAttributeValue::Float(stored), SearchAttributeValue::Float(value)) => stored > value,
256        (SearchAttributeValue::Datetime(stored), SearchAttributeValue::Datetime(value)) => {
257            stored > value
258        }
259        _ => false,
260    }
261}
262
263#[must_use]
264fn stored_less_than(stored: &SearchAttributeValue, value: &SearchAttributeValue) -> bool {
265    match (stored, value) {
266        (SearchAttributeValue::Int(stored), SearchAttributeValue::Int(value)) => stored < value,
267        (SearchAttributeValue::Float(stored), SearchAttributeValue::Float(value)) => stored < value,
268        (SearchAttributeValue::Datetime(stored), SearchAttributeValue::Datetime(value)) => {
269            stored < value
270        }
271        _ => false,
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use std::{collections::HashMap, sync::Arc};
278
279    use aion_core::{RunId, SearchAttributeValue, WorkflowId, WorkflowStatus};
280    use chrono::{DateTime, Utc};
281
282    use super::{ListWorkflowsFilter, SearchAttributePredicate, VisibilityStore, WorkflowSummary};
283
284    #[test]
285    fn visibility_store_is_object_safe() {
286        let _: Option<Arc<dyn VisibilityStore>> = None;
287    }
288
289    #[test]
290    fn default_filter_matches_all_workflows() {
291        let summary = workflow_summary();
292
293        assert!(ListWorkflowsFilter::default().matches(&summary));
294    }
295
296    #[test]
297    fn mismatched_typed_search_attribute_predicates_do_not_match() {
298        let summary = workflow_summary();
299
300        let greater_than_string = SearchAttributePredicate::GreaterThan {
301            name: String::from("customer"),
302            value: SearchAttributeValue::String(String::from("a")),
303        };
304        let contains_non_keyword_list = SearchAttributePredicate::Contains {
305            name: String::from("attempts"),
306            keyword: String::from("vip"),
307        };
308
309        assert!(
310            !ListWorkflowsFilter {
311                search_attributes: vec![greater_than_string],
312                ..ListWorkflowsFilter::default()
313            }
314            .matches(&summary)
315        );
316        assert!(
317            !ListWorkflowsFilter {
318                search_attributes: vec![contains_non_keyword_list],
319                ..ListWorkflowsFilter::default()
320            }
321            .matches(&summary)
322        );
323    }
324
325    #[test]
326    fn search_attribute_predicates_match_supported_typed_operations() {
327        let summary = workflow_summary();
328
329        assert!(
330            SearchAttributePredicate::Equals {
331                name: String::from("customer"),
332                value: SearchAttributeValue::String(String::from("cust-1")),
333            }
334            .matches(&summary)
335        );
336        assert!(
337            SearchAttributePredicate::GreaterThan {
338                name: String::from("attempts"),
339                value: SearchAttributeValue::Int(2),
340            }
341            .matches(&summary)
342        );
343        assert!(
344            SearchAttributePredicate::LessThan {
345                name: String::from("attempts"),
346                value: SearchAttributeValue::Int(4),
347            }
348            .matches(&summary)
349        );
350        assert!(
351            SearchAttributePredicate::Contains {
352                name: String::from("tags"),
353                keyword: String::from("vip"),
354            }
355            .matches(&summary)
356        );
357    }
358
359    fn workflow_summary() -> WorkflowSummary {
360        let mut search_attributes = HashMap::new();
361        search_attributes.insert(
362            String::from("customer"),
363            SearchAttributeValue::String(String::from("cust-1")),
364        );
365        search_attributes.insert(String::from("attempts"), SearchAttributeValue::Int(3));
366        search_attributes.insert(
367            String::from("tags"),
368            SearchAttributeValue::KeywordList(vec![String::from("vip"), String::from("west")]),
369        );
370
371        WorkflowSummary {
372            workflow_id: WorkflowId::new_v4(),
373            run_id: RunId::new_v4(),
374            workflow_type: String::from("example"),
375            status: WorkflowStatus::Running,
376            start_time: DateTime::<Utc>::default(),
377            close_time: None,
378            search_attributes,
379        }
380    }
381}