Skip to main content

aion_store_libsql/
visibility.rs

1//! Workflow visibility projection storage backed by libSQL.
2
3use std::collections::HashMap;
4use std::fmt::Write as _;
5
6use aion_core::{RunId, SearchAttributeValue, WorkflowId, WorkflowStatus};
7use aion_store::StoreError;
8use aion_store::visibility::{
9    ListWorkflowsFilter, SearchAttributePredicate, VisibilityRecord, VisibilityStore,
10    WorkflowSummary as VisibilityWorkflowSummary,
11};
12use async_trait::async_trait;
13use chrono::{DateTime, SecondsFormat, Utc};
14use libsql::{Value, params_from_iter};
15use uuid::Uuid;
16
17use crate::store::LibSqlStore;
18
19const UPSERT_VISIBILITY_SQL: &str = "
20INSERT OR REPLACE INTO visibility (
21    workflow_id,
22    run_id,
23    workflow_type,
24    status,
25    start_time,
26    close_time,
27    search_attributes
28)
29VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)";
30
31/// Upsert a complete workflow visibility projection row.
32///
33/// # Errors
34///
35/// Returns `StoreError::Serialization` when record fields cannot be encoded and
36/// `StoreError::Backend` when libSQL rejects the upsert.
37pub(crate) async fn record_visibility(
38    conn: &libsql::Connection,
39    record: VisibilityRecord,
40) -> Result<(), StoreError> {
41    let workflow_id = record.workflow_id.to_string();
42    let run_id = record.run_id.to_string();
43    let status = encode_status(record.status)?;
44    let start_time = encode_timestamp(record.start_time);
45    let close_time = record.close_time.map(encode_timestamp);
46    let search_attributes = serde_json::to_string(&record.search_attributes)
47        .map_err(|error| crate::error::serde_json_error(&error))?;
48
49    conn.execute(
50        UPSERT_VISIBILITY_SQL,
51        (
52            workflow_id,
53            run_id,
54            record.workflow_type,
55            status,
56            start_time,
57            close_time,
58            search_attributes,
59        ),
60    )
61    .await
62    .map_err(|error| crate::error::libsql_error(&error))?;
63
64    Ok(())
65}
66
67/// List workflow visibility summaries matching `filter`.
68///
69/// # Errors
70///
71/// Returns backend errors from libSQL or serialization errors when persisted rows cannot be decoded.
72pub(crate) async fn list_workflows(
73    conn: &libsql::Connection,
74    filter: ListWorkflowsFilter,
75) -> Result<Vec<VisibilityWorkflowSummary>, StoreError> {
76    let plan = QueryPlan::list(&filter)?;
77    let mut rows = conn
78        .query(&plan.sql, params_from_iter(plan.params))
79        .await
80        .map_err(|error| crate::error::libsql_error(&error))?;
81
82    let mut summaries = Vec::new();
83    while let Some(row) = rows
84        .next()
85        .await
86        .map_err(|error| crate::error::libsql_error(&error))?
87    {
88        summaries.push(decode_summary(&row)?);
89    }
90
91    Ok(summaries)
92}
93
94/// Count workflow visibility summaries matching `filter`, ignoring pagination fields.
95///
96/// # Errors
97///
98/// Returns backend errors from libSQL or serialization errors from filter encoding.
99pub(crate) async fn count_workflows(
100    conn: &libsql::Connection,
101    filter: ListWorkflowsFilter,
102) -> Result<u64, StoreError> {
103    let plan = QueryPlan::count(&filter)?;
104    let mut rows = conn
105        .query(&plan.sql, params_from_iter(plan.params))
106        .await
107        .map_err(|error| crate::error::libsql_error(&error))?;
108    let row = rows
109        .next()
110        .await
111        .map_err(|error| crate::error::libsql_error(&error))?
112        .ok_or_else(|| {
113            StoreError::Backend(String::from("visibility count query returned no row"))
114        })?;
115    let count: i64 = row
116        .get(0)
117        .map_err(|error| crate::error::libsql_error(&error))?;
118
119    u64::try_from(count).map_err(|error| StoreError::Backend(error.to_string()))
120}
121
122#[async_trait]
123impl VisibilityStore for LibSqlStore {
124    async fn record_visibility(&self, record: VisibilityRecord) -> Result<(), StoreError> {
125        record_visibility(self.connection(), record).await
126    }
127
128    async fn list_workflows(
129        &self,
130        filter: ListWorkflowsFilter,
131    ) -> Result<Vec<VisibilityWorkflowSummary>, StoreError> {
132        list_workflows(self.connection(), filter).await
133    }
134
135    async fn count_workflows(&self, filter: ListWorkflowsFilter) -> Result<u64, StoreError> {
136        count_workflows(self.connection(), filter).await
137    }
138}
139
140struct QueryPlan {
141    sql: String,
142    params: Vec<Value>,
143}
144
145impl QueryPlan {
146    fn list(filter: &ListWorkflowsFilter) -> Result<Self, StoreError> {
147        let mut plan = Self::filtered(
148            "SELECT workflow_id, run_id, workflow_type, status, start_time, close_time, search_attributes FROM visibility",
149            filter,
150        )?;
151        plan.sql
152            .push_str(" ORDER BY start_time DESC, workflow_id ASC");
153        if let Some(limit) = filter.limit {
154            plan.push_param(Value::Integer(i64::from(limit)));
155            write!(plan.sql, " LIMIT ?{}", plan.params.len())
156                .map_err(|error| StoreError::Backend(error.to_string()))?;
157        }
158        if let Some(offset) = filter.offset {
159            plan.push_param(Value::Integer(i64::from(offset)));
160            write!(plan.sql, " OFFSET ?{}", plan.params.len())
161                .map_err(|error| StoreError::Backend(error.to_string()))?;
162        }
163        Ok(plan)
164    }
165
166    fn count(filter: &ListWorkflowsFilter) -> Result<Self, StoreError> {
167        Self::filtered("SELECT COUNT(*) FROM visibility", filter)
168    }
169
170    fn filtered(base_sql: &str, filter: &ListWorkflowsFilter) -> Result<Self, StoreError> {
171        let mut builder = FilterBuilder::default();
172        builder.add_filter(filter)?;
173        let (where_sql, params) = builder.finish();
174        let sql = if where_sql.is_empty() {
175            String::from(base_sql)
176        } else {
177            format!("{base_sql} WHERE {where_sql}")
178        };
179
180        Ok(Self { sql, params })
181    }
182
183    fn push_param(&mut self, value: Value) {
184        self.params.push(value);
185    }
186}
187
188#[derive(Default)]
189struct FilterBuilder {
190    clauses: Vec<String>,
191    params: Vec<Value>,
192}
193
194impl FilterBuilder {
195    fn add_filter(&mut self, filter: &ListWorkflowsFilter) -> Result<(), StoreError> {
196        if let Some(workflow_type) = &filter.workflow_type {
197            self.push_clause("workflow_type =", Value::Text(workflow_type.clone()));
198        }
199        if let Some(status) = filter.status {
200            self.push_clause("status =", Value::Text(encode_status(status)?));
201        }
202        if let Some(started_after) = filter.started_after {
203            self.push_clause(
204                "start_time >=",
205                Value::Text(encode_timestamp(started_after)),
206            );
207        }
208        if let Some(started_before) = filter.started_before {
209            self.push_clause(
210                "start_time <=",
211                Value::Text(encode_timestamp(started_before)),
212            );
213        }
214        if let Some(closed_after) = filter.closed_after {
215            self.push_clause("close_time >=", Value::Text(encode_timestamp(closed_after)));
216        }
217        if let Some(closed_before) = filter.closed_before {
218            self.push_clause(
219                "close_time <=",
220                Value::Text(encode_timestamp(closed_before)),
221            );
222        }
223        for predicate in &filter.search_attributes {
224            self.add_search_attribute_predicate(predicate)?;
225        }
226
227        Ok(())
228    }
229
230    fn push_clause(&mut self, lhs_and_operator: &str, value: Value) {
231        self.params.push(value);
232        self.clauses
233            .push(format!("{lhs_and_operator} ?{}", self.params.len()));
234    }
235
236    fn add_search_attribute_predicate(
237        &mut self,
238        predicate: &SearchAttributePredicate,
239    ) -> Result<(), StoreError> {
240        match predicate {
241            SearchAttributePredicate::Equals { name, value } => self.add_equals(name, value),
242            SearchAttributePredicate::GreaterThan { name, value } => {
243                self.add_ordered_comparison(name, value, ">")
244            }
245            SearchAttributePredicate::LessThan { name, value } => {
246                self.add_ordered_comparison(name, value, "<")
247            }
248            SearchAttributePredicate::Contains { name, keyword } => {
249                self.add_contains(name, keyword)
250            }
251        }
252    }
253
254    fn add_equals(&mut self, name: &str, value: &SearchAttributeValue) -> Result<(), StoreError> {
255        let type_path = search_attribute_path(name, "type")?;
256        let data_path = search_attribute_path(name, "data")?;
257        let type_param = self.push(Value::Text(type_name(value)));
258        let data_param = self.push(search_attribute_data_value(value)?);
259        let type_path_param = self.push(Value::Text(type_path));
260        let data_path_param = self.push(Value::Text(data_path));
261        self.clauses.push(format!(
262            "json_extract(search_attributes, ?{type_path_param}) = ?{type_param} AND json_extract(search_attributes, ?{data_path_param}) = ?{data_param}"
263        ));
264        Ok(())
265    }
266
267    fn add_ordered_comparison(
268        &mut self,
269        name: &str,
270        value: &SearchAttributeValue,
271        operator: &str,
272    ) -> Result<(), StoreError> {
273        if !is_ordered_value(value) {
274            self.clauses.push(String::from("0 = 1"));
275            return Ok(());
276        }
277
278        let type_path = search_attribute_path(name, "type")?;
279        let data_path = search_attribute_path(name, "data")?;
280        let type_param = self.push(Value::Text(type_name(value)));
281        let data_param = self.push(search_attribute_data_value(value)?);
282        let type_path_param = self.push(Value::Text(type_path));
283        let data_path_param = self.push(Value::Text(data_path));
284        self.clauses.push(format!(
285            "json_extract(search_attributes, ?{type_path_param}) = ?{type_param} AND json_extract(search_attributes, ?{data_path_param}) {operator} ?{data_param}"
286        ));
287        Ok(())
288    }
289
290    fn add_contains(&mut self, name: &str, keyword: &str) -> Result<(), StoreError> {
291        let type_path = search_attribute_path(name, "type")?;
292        let data_path = search_attribute_path(name, "data")?;
293        let type_param = self.push(Value::Text(String::from("KeywordList")));
294        let keyword_param = self.push(Value::Text(String::from(keyword)));
295        let type_path_param = self.push(Value::Text(type_path));
296        let data_path_param = self.push(Value::Text(data_path));
297        self.clauses.push(format!(
298            "json_extract(search_attributes, ?{type_path_param}) = ?{type_param} AND EXISTS (SELECT 1 FROM json_each(json_extract(search_attributes, ?{data_path_param})) WHERE value = ?{keyword_param})"
299        ));
300        Ok(())
301    }
302
303    fn push(&mut self, value: Value) -> usize {
304        self.params.push(value);
305        self.params.len()
306    }
307
308    fn finish(self) -> (String, Vec<Value>) {
309        (self.clauses.join(" AND "), self.params)
310    }
311}
312
313fn decode_summary(row: &libsql::Row) -> Result<VisibilityWorkflowSummary, StoreError> {
314    let workflow_id: String = row
315        .get(0)
316        .map_err(|error| crate::error::libsql_error(&error))?;
317    let run_id: String = row
318        .get(1)
319        .map_err(|error| crate::error::libsql_error(&error))?;
320    let workflow_type: String = row
321        .get(2)
322        .map_err(|error| crate::error::libsql_error(&error))?;
323    let status: String = row
324        .get(3)
325        .map_err(|error| crate::error::libsql_error(&error))?;
326    let start_time: String = row
327        .get(4)
328        .map_err(|error| crate::error::libsql_error(&error))?;
329    let close_time: Option<String> = row
330        .get(5)
331        .map_err(|error| crate::error::libsql_error(&error))?;
332    let search_attributes: String = row
333        .get(6)
334        .map_err(|error| crate::error::libsql_error(&error))?;
335
336    Ok(VisibilityWorkflowSummary {
337        workflow_id: decode_workflow_id(&workflow_id)?,
338        run_id: decode_run_id(&run_id)?,
339        workflow_type,
340        status: decode_status(&status)?,
341        start_time: decode_timestamp(&start_time)?,
342        close_time: close_time.as_deref().map(decode_timestamp).transpose()?,
343        search_attributes: serde_json::from_str::<HashMap<String, SearchAttributeValue>>(
344            &search_attributes,
345        )
346        .map_err(|error| crate::error::serde_json_error(&error))?,
347    })
348}
349
350fn encode_status(status: WorkflowStatus) -> Result<String, StoreError> {
351    serde_json::to_string(&status).map_err(|error| crate::error::serde_json_error(&error))
352}
353
354fn decode_status(value: &str) -> Result<WorkflowStatus, StoreError> {
355    serde_json::from_str(value).map_err(|error| crate::error::serde_json_error(&error))
356}
357
358fn encode_timestamp(timestamp: DateTime<Utc>) -> String {
359    timestamp.to_rfc3339_opts(SecondsFormat::Nanos, true)
360}
361
362fn decode_timestamp(value: &str) -> Result<DateTime<Utc>, StoreError> {
363    DateTime::parse_from_rfc3339(value)
364        .map(|date_time| date_time.with_timezone(&Utc))
365        .map_err(|error| StoreError::Serialization(error.to_string()))
366}
367
368fn decode_workflow_id(value: &str) -> Result<WorkflowId, StoreError> {
369    Uuid::parse_str(value)
370        .map(WorkflowId::new)
371        .map_err(|error| StoreError::Serialization(error.to_string()))
372}
373
374fn decode_run_id(value: &str) -> Result<RunId, StoreError> {
375    Uuid::parse_str(value)
376        .map(RunId::new)
377        .map_err(|error| StoreError::Serialization(error.to_string()))
378}
379
380fn search_attribute_path(name: &str, field: &str) -> Result<String, StoreError> {
381    let quoted_name =
382        serde_json::to_string(name).map_err(|error| crate::error::serde_json_error(&error))?;
383    Ok(format!("$.{quoted_name}.{field}"))
384}
385
386fn type_name(value: &SearchAttributeValue) -> String {
387    match value {
388        SearchAttributeValue::String(_) => String::from("String"),
389        SearchAttributeValue::Int(_) => String::from("Int"),
390        SearchAttributeValue::Float(_) => String::from("Float"),
391        SearchAttributeValue::Bool(_) => String::from("Bool"),
392        SearchAttributeValue::Datetime(_) => String::from("Datetime"),
393        SearchAttributeValue::KeywordList(_) => String::from("KeywordList"),
394    }
395}
396
397fn search_attribute_data_value(value: &SearchAttributeValue) -> Result<Value, StoreError> {
398    match value {
399        SearchAttributeValue::String(value) => Ok(Value::Text(value.clone())),
400        SearchAttributeValue::Int(value) => Ok(Value::Integer(*value)),
401        SearchAttributeValue::Float(value) => Ok(Value::Real(*value)),
402        SearchAttributeValue::Bool(value) => Ok(Value::Integer(i64::from(*value))),
403        SearchAttributeValue::Datetime(value) => {
404            Ok(Value::Text(serde_search_attribute_data(value)?))
405        }
406        SearchAttributeValue::KeywordList(values) => serde_json::to_string(values)
407            .map(Value::Text)
408            .map_err(|error| crate::error::serde_json_error(&error)),
409    }
410}
411
412fn serde_search_attribute_data(value: &DateTime<Utc>) -> Result<String, StoreError> {
413    let json = serde_json::to_value(SearchAttributeValue::Datetime(value.to_owned()))
414        .map_err(|error| crate::error::serde_json_error(&error))?;
415    json.get("data")
416        .and_then(serde_json::Value::as_str)
417        .map(String::from)
418        .ok_or_else(|| {
419            StoreError::Serialization(String::from(
420                "datetime search attribute encoded without string data",
421            ))
422        })
423}
424
425const fn is_ordered_value(value: &SearchAttributeValue) -> bool {
426    matches!(
427        value,
428        SearchAttributeValue::Int(_)
429            | SearchAttributeValue::Float(_)
430            | SearchAttributeValue::Datetime(_)
431    )
432}
433
434#[cfg(test)]
435mod tests {
436    use std::collections::HashMap;
437    use std::path::PathBuf;
438    use std::time::{SystemTime, UNIX_EPOCH};
439
440    use aion_core::{RunId, SearchAttributeValue, WorkflowId, WorkflowStatus};
441    use aion_store::StoreError;
442    use aion_store::visibility::{
443        ListWorkflowsFilter, SearchAttributePredicate, VisibilityRecord, VisibilityStore,
444        WorkflowSummary as VisibilityWorkflowSummary,
445    };
446    use chrono::{DateTime, TimeZone, Utc};
447
448    use super::{
449        count_workflows, decode_timestamp, encode_timestamp, list_workflows, record_visibility,
450    };
451    use crate::config::{LibSqlConfig, LibSqlMode};
452
453    #[tokio::test]
454    async fn record_visibility_inserts_and_updates_queryable_row() -> Result<(), StoreError> {
455        let conn = open_test_connection("upsert").await?;
456        let workflow_id = WorkflowId::new_v4();
457        let run_id = RunId::new_v4();
458        let mut record = visibility_record(
459            workflow_id.clone(),
460            run_id.clone(),
461            "orders",
462            WorkflowStatus::Running,
463            instant(2026, 6, 1, 9, 0, 0)?,
464            None,
465        );
466        record_visibility(&conn, record.clone()).await?;
467
468        record.status = WorkflowStatus::Completed;
469        record.close_time = Some(instant(2026, 6, 1, 10, 0, 0)?);
470        record.search_attributes.insert(
471            String::from("customer"),
472            SearchAttributeValue::String(String::from("cust-2")),
473        );
474        record_visibility(&conn, record.clone()).await?;
475
476        let summaries = list_workflows(&conn, ListWorkflowsFilter::default()).await?;
477        assert_eq!(summaries.len(), 1);
478        assert_eq!(summaries[0], record.into());
479
480        Ok(())
481    }
482
483    #[tokio::test]
484    async fn list_filters_standard_fields_and_time_ranges() -> Result<(), StoreError> {
485        let conn = open_test_connection("standard-filters").await?;
486        let first = seed_record(
487            &conn,
488            "orders",
489            WorkflowStatus::Completed,
490            instant(2026, 6, 1, 9, 0, 0)?,
491            Some(instant(2026, 6, 1, 10, 0, 0)?),
492            "cust-1",
493            2,
494        )
495        .await?;
496        let second = seed_record(
497            &conn,
498            "billing",
499            WorkflowStatus::Running,
500            instant(2026, 6, 2, 9, 0, 0)?,
501            None,
502            "cust-2",
503            5,
504        )
505        .await?;
506
507        assert_eq!(
508            ids(list_workflows(
509                &conn,
510                ListWorkflowsFilter {
511                    workflow_type: Some(String::from("orders")),
512                    ..ListWorkflowsFilter::default()
513                },
514            )
515            .await?),
516            vec![first.workflow_id.clone()]
517        );
518        assert_eq!(
519            ids(list_workflows(
520                &conn,
521                ListWorkflowsFilter {
522                    status: Some(WorkflowStatus::Running),
523                    ..ListWorkflowsFilter::default()
524                },
525            )
526            .await?),
527            vec![second.workflow_id.clone()]
528        );
529        assert_eq!(
530            ids(list_workflows(
531                &conn,
532                ListWorkflowsFilter {
533                    started_after: Some(instant(2026, 6, 2, 0, 0, 0)?),
534                    started_before: Some(instant(2026, 6, 2, 23, 59, 59)?),
535                    ..ListWorkflowsFilter::default()
536                },
537            )
538            .await?),
539            vec![second.workflow_id.clone()]
540        );
541        assert_eq!(
542            ids(list_workflows(
543                &conn,
544                ListWorkflowsFilter {
545                    closed_after: Some(instant(2026, 6, 1, 9, 30, 0)?),
546                    closed_before: Some(instant(2026, 6, 1, 10, 30, 0)?),
547                    ..ListWorkflowsFilter::default()
548                },
549            )
550            .await?),
551            vec![first.workflow_id.clone()]
552        );
553
554        Ok(())
555    }
556
557    #[tokio::test]
558    async fn list_filters_custom_search_attributes() -> Result<(), StoreError> {
559        let conn = open_test_connection("custom-filters").await?;
560        let first = seed_record(
561            &conn,
562            "orders",
563            WorkflowStatus::Completed,
564            instant(2026, 6, 1, 9, 0, 0)?,
565            Some(instant(2026, 6, 1, 10, 0, 0)?),
566            "cust-1",
567            2,
568        )
569        .await?;
570        let second = seed_record(
571            &conn,
572            "orders",
573            WorkflowStatus::Completed,
574            instant(2026, 6, 2, 9, 0, 0)?,
575            Some(instant(2026, 6, 2, 10, 0, 0)?),
576            "cust-2",
577            5,
578        )
579        .await?;
580
581        assert_eq!(
582            ids(list_workflows(
583                &conn,
584                custom_filter(SearchAttributePredicate::Equals {
585                    name: String::from("customer"),
586                    value: SearchAttributeValue::String(String::from("cust-1")),
587                })
588            )
589            .await?),
590            vec![first.workflow_id.clone()]
591        );
592        assert_eq!(
593            ids(list_workflows(
594                &conn,
595                custom_filter(SearchAttributePredicate::GreaterThan {
596                    name: String::from("attempts"),
597                    value: SearchAttributeValue::Int(3),
598                })
599            )
600            .await?),
601            vec![second.workflow_id.clone()]
602        );
603        assert_eq!(
604            ids(list_workflows(
605                &conn,
606                custom_filter(SearchAttributePredicate::LessThan {
607                    name: String::from("attempts"),
608                    value: SearchAttributeValue::Int(3),
609                })
610            )
611            .await?),
612            vec![first.workflow_id.clone()]
613        );
614        assert_eq!(
615            ids(list_workflows(
616                &conn,
617                custom_filter(SearchAttributePredicate::Contains {
618                    name: String::from("tags"),
619                    keyword: String::from("west"),
620                })
621            )
622            .await?),
623            vec![second.workflow_id.clone(), first.workflow_id.clone()]
624        );
625
626        Ok(())
627    }
628
629    #[tokio::test]
630    async fn list_empty_filter_orders_by_start_time_desc_and_paginates() -> Result<(), StoreError> {
631        let conn = open_test_connection("pagination").await?;
632        let oldest = seed_record(
633            &conn,
634            "orders",
635            WorkflowStatus::Running,
636            instant(2026, 6, 1, 9, 0, 0)?,
637            None,
638            "cust-1",
639            1,
640        )
641        .await?;
642        let middle = seed_record(
643            &conn,
644            "orders",
645            WorkflowStatus::Running,
646            instant(2026, 6, 2, 9, 0, 0)?,
647            None,
648            "cust-2",
649            2,
650        )
651        .await?;
652        let newest = seed_record(
653            &conn,
654            "orders",
655            WorkflowStatus::Running,
656            instant(2026, 6, 3, 9, 0, 0)?,
657            None,
658            "cust-3",
659            3,
660        )
661        .await?;
662
663        assert_eq!(
664            ids(list_workflows(&conn, ListWorkflowsFilter::default()).await?),
665            vec![
666                newest.workflow_id,
667                middle.workflow_id.clone(),
668                oldest.workflow_id
669            ]
670        );
671        assert_eq!(
672            ids(list_workflows(
673                &conn,
674                ListWorkflowsFilter {
675                    limit: Some(1),
676                    offset: Some(1),
677                    ..ListWorkflowsFilter::default()
678                },
679            )
680            .await?),
681            vec![middle.workflow_id]
682        );
683
684        Ok(())
685    }
686
687    #[tokio::test]
688    async fn count_workflows_counts_total_and_filtered_matches() -> Result<(), StoreError> {
689        let conn = open_test_connection("count").await?;
690        seed_record(
691            &conn,
692            "orders",
693            WorkflowStatus::Completed,
694            instant(2026, 6, 1, 9, 0, 0)?,
695            Some(instant(2026, 6, 1, 10, 0, 0)?),
696            "cust-1",
697            2,
698        )
699        .await?;
700        seed_record(
701            &conn,
702            "orders",
703            WorkflowStatus::Running,
704            instant(2026, 6, 2, 9, 0, 0)?,
705            None,
706            "cust-2",
707            5,
708        )
709        .await?;
710
711        assert_eq!(
712            count_workflows(&conn, ListWorkflowsFilter::default()).await?,
713            2
714        );
715        let filter = ListWorkflowsFilter {
716            status: Some(WorkflowStatus::Completed),
717            ..ListWorkflowsFilter::default()
718        };
719        assert_eq!(count_workflows(&conn, filter.clone()).await?, 1);
720        assert_eq!(
721            usize::try_from(count_workflows(&conn, filter.clone()).await?).unwrap_or(usize::MAX),
722            list_workflows(&conn, filter).await?.len()
723        );
724
725        Ok(())
726    }
727
728    #[tokio::test]
729    async fn libsql_store_implements_visibility_store_trait() -> Result<(), StoreError> {
730        let store = crate::store::LibSqlStore::open(unique_temp_path("trait")).await?;
731        let store: std::sync::Arc<dyn VisibilityStore> = std::sync::Arc::new(store);
732
733        assert_eq!(std::sync::Arc::strong_count(&store), 1);
734        Ok(())
735    }
736
737    #[test]
738    fn timestamp_encoding_round_trips_losslessly() -> Result<(), StoreError> {
739        let timestamp = instant(2026, 6, 3, 2, 30, 0)?;
740
741        assert_eq!(decode_timestamp(&encode_timestamp(timestamp))?, timestamp);
742        Ok(())
743    }
744
745    async fn seed_record(
746        conn: &libsql::Connection,
747        workflow_type: &str,
748        status: WorkflowStatus,
749        start_time: DateTime<Utc>,
750        close_time: Option<DateTime<Utc>>,
751        customer: &str,
752        attempts: i64,
753    ) -> Result<VisibilityRecord, StoreError> {
754        let record = visibility_record(
755            WorkflowId::new_v4(),
756            RunId::new_v4(),
757            workflow_type,
758            status,
759            start_time,
760            close_time,
761        )
762        .with_customer(customer, attempts);
763        record_visibility(conn, record.clone()).await?;
764        Ok(record)
765    }
766
767    trait RecordBuilder {
768        fn with_customer(self, customer: &str, attempts: i64) -> Self;
769    }
770
771    impl RecordBuilder for VisibilityRecord {
772        fn with_customer(mut self, customer: &str, attempts: i64) -> Self {
773            self.search_attributes.insert(
774                String::from("customer"),
775                SearchAttributeValue::String(String::from(customer)),
776            );
777            self.search_attributes.insert(
778                String::from("attempts"),
779                SearchAttributeValue::Int(attempts),
780            );
781            self.search_attributes.insert(
782                String::from("tags"),
783                SearchAttributeValue::KeywordList(vec![String::from("vip"), String::from("west")]),
784            );
785            self
786        }
787    }
788
789    fn visibility_record(
790        workflow_id: WorkflowId,
791        run_id: RunId,
792        workflow_type: &str,
793        status: WorkflowStatus,
794        start_time: DateTime<Utc>,
795        close_time: Option<DateTime<Utc>>,
796    ) -> VisibilityRecord {
797        VisibilityRecord {
798            workflow_id,
799            run_id,
800            workflow_type: String::from(workflow_type),
801            status,
802            start_time,
803            close_time,
804            search_attributes: HashMap::new(),
805        }
806    }
807
808    fn custom_filter(predicate: SearchAttributePredicate) -> ListWorkflowsFilter {
809        ListWorkflowsFilter {
810            search_attributes: vec![predicate],
811            ..ListWorkflowsFilter::default()
812        }
813    }
814
815    fn ids(summaries: Vec<VisibilityWorkflowSummary>) -> Vec<WorkflowId> {
816        summaries
817            .into_iter()
818            .map(|summary| summary.workflow_id)
819            .collect()
820    }
821
822    async fn open_test_connection(name: &str) -> Result<libsql::Connection, StoreError> {
823        let config = LibSqlConfig {
824            mode: LibSqlMode::Embedded {
825                path: unique_temp_path(name),
826            },
827            journal_mode: None,
828            synchronous: None,
829            sync_interval_seconds: None,
830        };
831        let conn = crate::connection::open_connection(&config)
832            .await?
833            .connection;
834        crate::schema::ensure_schema(&conn).await?;
835        Ok(conn)
836    }
837
838    fn instant(
839        year: i32,
840        month: u32,
841        day: u32,
842        hour: u32,
843        minute: u32,
844        second: u32,
845    ) -> Result<DateTime<Utc>, StoreError> {
846        Utc.with_ymd_and_hms(year, month, day, hour, minute, second)
847            .single()
848            .ok_or_else(|| StoreError::Serialization(String::from("invalid test instant")))
849    }
850
851    fn unique_temp_path(name: &str) -> PathBuf {
852        let nanos = SystemTime::now()
853            .duration_since(UNIX_EPOCH)
854            .map_or(0, |duration| duration.as_nanos());
855        std::env::temp_dir().join(format!(
856            "aion-store-libsql-visibility-{name}-{}-{nanos}.db",
857            std::process::id()
858        ))
859    }
860}