1use std::collections::HashMap;
4use std::fmt::Write as _;
5
6use aion_core::{RunId, SearchAttributeValue, WorkflowId, WorkflowStatus};
7use aion_store::StoreError;
8use aion_store::visibility::{
9 ListWorkflowsFilter, SearchAttributePredicate, VisibilityRecord, VisibilityStore,
10 WorkflowSummary as VisibilityWorkflowSummary,
11};
12use async_trait::async_trait;
13use chrono::{DateTime, SecondsFormat, Utc};
14use libsql::{Value, params_from_iter};
15use uuid::Uuid;
16
17use crate::store::LibSqlStore;
18
19const UPSERT_VISIBILITY_SQL: &str = "
20INSERT OR REPLACE INTO visibility (
21 workflow_id,
22 run_id,
23 workflow_type,
24 status,
25 start_time,
26 close_time,
27 search_attributes
28)
29VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)";
30
31pub(crate) async fn record_visibility(
38 conn: &libsql::Connection,
39 record: VisibilityRecord,
40) -> Result<(), StoreError> {
41 let workflow_id = record.workflow_id.to_string();
42 let run_id = record.run_id.to_string();
43 let status = encode_status(record.status)?;
44 let start_time = encode_timestamp(record.start_time);
45 let close_time = record.close_time.map(encode_timestamp);
46 let search_attributes = serde_json::to_string(&record.search_attributes)
47 .map_err(|error| crate::error::serde_json_error(&error))?;
48
49 conn.execute(
50 UPSERT_VISIBILITY_SQL,
51 (
52 workflow_id,
53 run_id,
54 record.workflow_type,
55 status,
56 start_time,
57 close_time,
58 search_attributes,
59 ),
60 )
61 .await
62 .map_err(|error| crate::error::libsql_error(&error))?;
63
64 Ok(())
65}
66
67pub(crate) async fn list_workflows(
73 conn: &libsql::Connection,
74 filter: ListWorkflowsFilter,
75) -> Result<Vec<VisibilityWorkflowSummary>, StoreError> {
76 let plan = QueryPlan::list(&filter)?;
77 let mut rows = conn
78 .query(&plan.sql, params_from_iter(plan.params))
79 .await
80 .map_err(|error| crate::error::libsql_error(&error))?;
81
82 let mut summaries = Vec::new();
83 while let Some(row) = rows
84 .next()
85 .await
86 .map_err(|error| crate::error::libsql_error(&error))?
87 {
88 summaries.push(decode_summary(&row)?);
89 }
90
91 Ok(summaries)
92}
93
94pub(crate) async fn count_workflows(
100 conn: &libsql::Connection,
101 filter: ListWorkflowsFilter,
102) -> Result<u64, StoreError> {
103 let plan = QueryPlan::count(&filter)?;
104 let mut rows = conn
105 .query(&plan.sql, params_from_iter(plan.params))
106 .await
107 .map_err(|error| crate::error::libsql_error(&error))?;
108 let row = rows
109 .next()
110 .await
111 .map_err(|error| crate::error::libsql_error(&error))?
112 .ok_or_else(|| {
113 StoreError::Backend(String::from("visibility count query returned no row"))
114 })?;
115 let count: i64 = row
116 .get(0)
117 .map_err(|error| crate::error::libsql_error(&error))?;
118
119 u64::try_from(count).map_err(|error| StoreError::Backend(error.to_string()))
120}
121
122#[async_trait]
123impl VisibilityStore for LibSqlStore {
124 async fn record_visibility(&self, record: VisibilityRecord) -> Result<(), StoreError> {
125 record_visibility(self.connection(), record).await
126 }
127
128 async fn list_workflows(
129 &self,
130 filter: ListWorkflowsFilter,
131 ) -> Result<Vec<VisibilityWorkflowSummary>, StoreError> {
132 list_workflows(self.connection(), filter).await
133 }
134
135 async fn count_workflows(&self, filter: ListWorkflowsFilter) -> Result<u64, StoreError> {
136 count_workflows(self.connection(), filter).await
137 }
138}
139
140struct QueryPlan {
141 sql: String,
142 params: Vec<Value>,
143}
144
145impl QueryPlan {
146 fn list(filter: &ListWorkflowsFilter) -> Result<Self, StoreError> {
147 let mut plan = Self::filtered(
148 "SELECT workflow_id, run_id, workflow_type, status, start_time, close_time, search_attributes FROM visibility",
149 filter,
150 )?;
151 plan.sql
152 .push_str(" ORDER BY start_time DESC, workflow_id ASC");
153 if let Some(limit) = filter.limit {
154 plan.push_param(Value::Integer(i64::from(limit)));
155 write!(plan.sql, " LIMIT ?{}", plan.params.len())
156 .map_err(|error| StoreError::Backend(error.to_string()))?;
157 }
158 if let Some(offset) = filter.offset {
159 plan.push_param(Value::Integer(i64::from(offset)));
160 write!(plan.sql, " OFFSET ?{}", plan.params.len())
161 .map_err(|error| StoreError::Backend(error.to_string()))?;
162 }
163 Ok(plan)
164 }
165
166 fn count(filter: &ListWorkflowsFilter) -> Result<Self, StoreError> {
167 Self::filtered("SELECT COUNT(*) FROM visibility", filter)
168 }
169
170 fn filtered(base_sql: &str, filter: &ListWorkflowsFilter) -> Result<Self, StoreError> {
171 let mut builder = FilterBuilder::default();
172 builder.add_filter(filter)?;
173 let (where_sql, params) = builder.finish();
174 let sql = if where_sql.is_empty() {
175 String::from(base_sql)
176 } else {
177 format!("{base_sql} WHERE {where_sql}")
178 };
179
180 Ok(Self { sql, params })
181 }
182
183 fn push_param(&mut self, value: Value) {
184 self.params.push(value);
185 }
186}
187
188#[derive(Default)]
189struct FilterBuilder {
190 clauses: Vec<String>,
191 params: Vec<Value>,
192}
193
194impl FilterBuilder {
195 fn add_filter(&mut self, filter: &ListWorkflowsFilter) -> Result<(), StoreError> {
196 if let Some(workflow_type) = &filter.workflow_type {
197 self.push_clause("workflow_type =", Value::Text(workflow_type.clone()));
198 }
199 if let Some(status) = filter.status {
200 self.push_clause("status =", Value::Text(encode_status(status)?));
201 }
202 if let Some(started_after) = filter.started_after {
203 self.push_clause(
204 "start_time >=",
205 Value::Text(encode_timestamp(started_after)),
206 );
207 }
208 if let Some(started_before) = filter.started_before {
209 self.push_clause(
210 "start_time <=",
211 Value::Text(encode_timestamp(started_before)),
212 );
213 }
214 if let Some(closed_after) = filter.closed_after {
215 self.push_clause("close_time >=", Value::Text(encode_timestamp(closed_after)));
216 }
217 if let Some(closed_before) = filter.closed_before {
218 self.push_clause(
219 "close_time <=",
220 Value::Text(encode_timestamp(closed_before)),
221 );
222 }
223 for predicate in &filter.search_attributes {
224 self.add_search_attribute_predicate(predicate)?;
225 }
226
227 Ok(())
228 }
229
230 fn push_clause(&mut self, lhs_and_operator: &str, value: Value) {
231 self.params.push(value);
232 self.clauses
233 .push(format!("{lhs_and_operator} ?{}", self.params.len()));
234 }
235
236 fn add_search_attribute_predicate(
237 &mut self,
238 predicate: &SearchAttributePredicate,
239 ) -> Result<(), StoreError> {
240 match predicate {
241 SearchAttributePredicate::Equals { name, value } => self.add_equals(name, value),
242 SearchAttributePredicate::GreaterThan { name, value } => {
243 self.add_ordered_comparison(name, value, ">")
244 }
245 SearchAttributePredicate::LessThan { name, value } => {
246 self.add_ordered_comparison(name, value, "<")
247 }
248 SearchAttributePredicate::Contains { name, keyword } => {
249 self.add_contains(name, keyword)
250 }
251 }
252 }
253
254 fn add_equals(&mut self, name: &str, value: &SearchAttributeValue) -> Result<(), StoreError> {
255 let type_path = search_attribute_path(name, "type")?;
256 let data_path = search_attribute_path(name, "data")?;
257 let type_param = self.push(Value::Text(type_name(value)));
258 let data_param = self.push(search_attribute_data_value(value)?);
259 let type_path_param = self.push(Value::Text(type_path));
260 let data_path_param = self.push(Value::Text(data_path));
261 self.clauses.push(format!(
262 "json_extract(search_attributes, ?{type_path_param}) = ?{type_param} AND json_extract(search_attributes, ?{data_path_param}) = ?{data_param}"
263 ));
264 Ok(())
265 }
266
267 fn add_ordered_comparison(
268 &mut self,
269 name: &str,
270 value: &SearchAttributeValue,
271 operator: &str,
272 ) -> Result<(), StoreError> {
273 if !is_ordered_value(value) {
274 self.clauses.push(String::from("0 = 1"));
275 return Ok(());
276 }
277
278 let type_path = search_attribute_path(name, "type")?;
279 let data_path = search_attribute_path(name, "data")?;
280 let type_param = self.push(Value::Text(type_name(value)));
281 let data_param = self.push(search_attribute_data_value(value)?);
282 let type_path_param = self.push(Value::Text(type_path));
283 let data_path_param = self.push(Value::Text(data_path));
284 self.clauses.push(format!(
285 "json_extract(search_attributes, ?{type_path_param}) = ?{type_param} AND json_extract(search_attributes, ?{data_path_param}) {operator} ?{data_param}"
286 ));
287 Ok(())
288 }
289
290 fn add_contains(&mut self, name: &str, keyword: &str) -> Result<(), StoreError> {
291 let type_path = search_attribute_path(name, "type")?;
292 let data_path = search_attribute_path(name, "data")?;
293 let type_param = self.push(Value::Text(String::from("KeywordList")));
294 let keyword_param = self.push(Value::Text(String::from(keyword)));
295 let type_path_param = self.push(Value::Text(type_path));
296 let data_path_param = self.push(Value::Text(data_path));
297 self.clauses.push(format!(
298 "json_extract(search_attributes, ?{type_path_param}) = ?{type_param} AND EXISTS (SELECT 1 FROM json_each(json_extract(search_attributes, ?{data_path_param})) WHERE value = ?{keyword_param})"
299 ));
300 Ok(())
301 }
302
303 fn push(&mut self, value: Value) -> usize {
304 self.params.push(value);
305 self.params.len()
306 }
307
308 fn finish(self) -> (String, Vec<Value>) {
309 (self.clauses.join(" AND "), self.params)
310 }
311}
312
313fn decode_summary(row: &libsql::Row) -> Result<VisibilityWorkflowSummary, StoreError> {
314 let workflow_id: String = row
315 .get(0)
316 .map_err(|error| crate::error::libsql_error(&error))?;
317 let run_id: String = row
318 .get(1)
319 .map_err(|error| crate::error::libsql_error(&error))?;
320 let workflow_type: String = row
321 .get(2)
322 .map_err(|error| crate::error::libsql_error(&error))?;
323 let status: String = row
324 .get(3)
325 .map_err(|error| crate::error::libsql_error(&error))?;
326 let start_time: String = row
327 .get(4)
328 .map_err(|error| crate::error::libsql_error(&error))?;
329 let close_time: Option<String> = row
330 .get(5)
331 .map_err(|error| crate::error::libsql_error(&error))?;
332 let search_attributes: String = row
333 .get(6)
334 .map_err(|error| crate::error::libsql_error(&error))?;
335
336 Ok(VisibilityWorkflowSummary {
337 workflow_id: decode_workflow_id(&workflow_id)?,
338 run_id: decode_run_id(&run_id)?,
339 workflow_type,
340 status: decode_status(&status)?,
341 start_time: decode_timestamp(&start_time)?,
342 close_time: close_time.as_deref().map(decode_timestamp).transpose()?,
343 search_attributes: serde_json::from_str::<HashMap<String, SearchAttributeValue>>(
344 &search_attributes,
345 )
346 .map_err(|error| crate::error::serde_json_error(&error))?,
347 })
348}
349
350fn encode_status(status: WorkflowStatus) -> Result<String, StoreError> {
351 serde_json::to_string(&status).map_err(|error| crate::error::serde_json_error(&error))
352}
353
354fn decode_status(value: &str) -> Result<WorkflowStatus, StoreError> {
355 serde_json::from_str(value).map_err(|error| crate::error::serde_json_error(&error))
356}
357
358fn encode_timestamp(timestamp: DateTime<Utc>) -> String {
359 timestamp.to_rfc3339_opts(SecondsFormat::Nanos, true)
360}
361
362fn decode_timestamp(value: &str) -> Result<DateTime<Utc>, StoreError> {
363 DateTime::parse_from_rfc3339(value)
364 .map(|date_time| date_time.with_timezone(&Utc))
365 .map_err(|error| StoreError::Serialization(error.to_string()))
366}
367
368fn decode_workflow_id(value: &str) -> Result<WorkflowId, StoreError> {
369 Uuid::parse_str(value)
370 .map(WorkflowId::new)
371 .map_err(|error| StoreError::Serialization(error.to_string()))
372}
373
374fn decode_run_id(value: &str) -> Result<RunId, StoreError> {
375 Uuid::parse_str(value)
376 .map(RunId::new)
377 .map_err(|error| StoreError::Serialization(error.to_string()))
378}
379
380fn search_attribute_path(name: &str, field: &str) -> Result<String, StoreError> {
381 let quoted_name =
382 serde_json::to_string(name).map_err(|error| crate::error::serde_json_error(&error))?;
383 Ok(format!("$.{quoted_name}.{field}"))
384}
385
386fn type_name(value: &SearchAttributeValue) -> String {
387 match value {
388 SearchAttributeValue::String(_) => String::from("String"),
389 SearchAttributeValue::Int(_) => String::from("Int"),
390 SearchAttributeValue::Float(_) => String::from("Float"),
391 SearchAttributeValue::Bool(_) => String::from("Bool"),
392 SearchAttributeValue::Datetime(_) => String::from("Datetime"),
393 SearchAttributeValue::KeywordList(_) => String::from("KeywordList"),
394 }
395}
396
397fn search_attribute_data_value(value: &SearchAttributeValue) -> Result<Value, StoreError> {
398 match value {
399 SearchAttributeValue::String(value) => Ok(Value::Text(value.clone())),
400 SearchAttributeValue::Int(value) => Ok(Value::Integer(*value)),
401 SearchAttributeValue::Float(value) => Ok(Value::Real(*value)),
402 SearchAttributeValue::Bool(value) => Ok(Value::Integer(i64::from(*value))),
403 SearchAttributeValue::Datetime(value) => {
404 Ok(Value::Text(serde_search_attribute_data(value)?))
405 }
406 SearchAttributeValue::KeywordList(values) => serde_json::to_string(values)
407 .map(Value::Text)
408 .map_err(|error| crate::error::serde_json_error(&error)),
409 }
410}
411
412fn serde_search_attribute_data(value: &DateTime<Utc>) -> Result<String, StoreError> {
413 let json = serde_json::to_value(SearchAttributeValue::Datetime(value.to_owned()))
414 .map_err(|error| crate::error::serde_json_error(&error))?;
415 json.get("data")
416 .and_then(serde_json::Value::as_str)
417 .map(String::from)
418 .ok_or_else(|| {
419 StoreError::Serialization(String::from(
420 "datetime search attribute encoded without string data",
421 ))
422 })
423}
424
425const fn is_ordered_value(value: &SearchAttributeValue) -> bool {
426 matches!(
427 value,
428 SearchAttributeValue::Int(_)
429 | SearchAttributeValue::Float(_)
430 | SearchAttributeValue::Datetime(_)
431 )
432}
433
434#[cfg(test)]
435mod tests {
436 use std::collections::HashMap;
437 use std::path::PathBuf;
438 use std::time::{SystemTime, UNIX_EPOCH};
439
440 use aion_core::{RunId, SearchAttributeValue, WorkflowId, WorkflowStatus};
441 use aion_store::StoreError;
442 use aion_store::visibility::{
443 ListWorkflowsFilter, SearchAttributePredicate, VisibilityRecord, VisibilityStore,
444 WorkflowSummary as VisibilityWorkflowSummary,
445 };
446 use chrono::{DateTime, TimeZone, Utc};
447
448 use super::{
449 count_workflows, decode_timestamp, encode_timestamp, list_workflows, record_visibility,
450 };
451 use crate::config::{LibSqlConfig, LibSqlMode};
452
453 #[tokio::test]
454 async fn record_visibility_inserts_and_updates_queryable_row() -> Result<(), StoreError> {
455 let conn = open_test_connection("upsert").await?;
456 let workflow_id = WorkflowId::new_v4();
457 let run_id = RunId::new_v4();
458 let mut record = visibility_record(
459 workflow_id.clone(),
460 run_id.clone(),
461 "orders",
462 WorkflowStatus::Running,
463 instant(2026, 6, 1, 9, 0, 0)?,
464 None,
465 );
466 record_visibility(&conn, record.clone()).await?;
467
468 record.status = WorkflowStatus::Completed;
469 record.close_time = Some(instant(2026, 6, 1, 10, 0, 0)?);
470 record.search_attributes.insert(
471 String::from("customer"),
472 SearchAttributeValue::String(String::from("cust-2")),
473 );
474 record_visibility(&conn, record.clone()).await?;
475
476 let summaries = list_workflows(&conn, ListWorkflowsFilter::default()).await?;
477 assert_eq!(summaries.len(), 1);
478 assert_eq!(summaries[0], record.into());
479
480 Ok(())
481 }
482
483 #[tokio::test]
484 async fn list_filters_standard_fields_and_time_ranges() -> Result<(), StoreError> {
485 let conn = open_test_connection("standard-filters").await?;
486 let first = seed_record(
487 &conn,
488 "orders",
489 WorkflowStatus::Completed,
490 instant(2026, 6, 1, 9, 0, 0)?,
491 Some(instant(2026, 6, 1, 10, 0, 0)?),
492 "cust-1",
493 2,
494 )
495 .await?;
496 let second = seed_record(
497 &conn,
498 "billing",
499 WorkflowStatus::Running,
500 instant(2026, 6, 2, 9, 0, 0)?,
501 None,
502 "cust-2",
503 5,
504 )
505 .await?;
506
507 assert_eq!(
508 ids(list_workflows(
509 &conn,
510 ListWorkflowsFilter {
511 workflow_type: Some(String::from("orders")),
512 ..ListWorkflowsFilter::default()
513 },
514 )
515 .await?),
516 vec![first.workflow_id.clone()]
517 );
518 assert_eq!(
519 ids(list_workflows(
520 &conn,
521 ListWorkflowsFilter {
522 status: Some(WorkflowStatus::Running),
523 ..ListWorkflowsFilter::default()
524 },
525 )
526 .await?),
527 vec![second.workflow_id.clone()]
528 );
529 assert_eq!(
530 ids(list_workflows(
531 &conn,
532 ListWorkflowsFilter {
533 started_after: Some(instant(2026, 6, 2, 0, 0, 0)?),
534 started_before: Some(instant(2026, 6, 2, 23, 59, 59)?),
535 ..ListWorkflowsFilter::default()
536 },
537 )
538 .await?),
539 vec![second.workflow_id.clone()]
540 );
541 assert_eq!(
542 ids(list_workflows(
543 &conn,
544 ListWorkflowsFilter {
545 closed_after: Some(instant(2026, 6, 1, 9, 30, 0)?),
546 closed_before: Some(instant(2026, 6, 1, 10, 30, 0)?),
547 ..ListWorkflowsFilter::default()
548 },
549 )
550 .await?),
551 vec![first.workflow_id.clone()]
552 );
553
554 Ok(())
555 }
556
557 #[tokio::test]
558 async fn list_filters_custom_search_attributes() -> Result<(), StoreError> {
559 let conn = open_test_connection("custom-filters").await?;
560 let first = seed_record(
561 &conn,
562 "orders",
563 WorkflowStatus::Completed,
564 instant(2026, 6, 1, 9, 0, 0)?,
565 Some(instant(2026, 6, 1, 10, 0, 0)?),
566 "cust-1",
567 2,
568 )
569 .await?;
570 let second = seed_record(
571 &conn,
572 "orders",
573 WorkflowStatus::Completed,
574 instant(2026, 6, 2, 9, 0, 0)?,
575 Some(instant(2026, 6, 2, 10, 0, 0)?),
576 "cust-2",
577 5,
578 )
579 .await?;
580
581 assert_eq!(
582 ids(list_workflows(
583 &conn,
584 custom_filter(SearchAttributePredicate::Equals {
585 name: String::from("customer"),
586 value: SearchAttributeValue::String(String::from("cust-1")),
587 })
588 )
589 .await?),
590 vec![first.workflow_id.clone()]
591 );
592 assert_eq!(
593 ids(list_workflows(
594 &conn,
595 custom_filter(SearchAttributePredicate::GreaterThan {
596 name: String::from("attempts"),
597 value: SearchAttributeValue::Int(3),
598 })
599 )
600 .await?),
601 vec![second.workflow_id.clone()]
602 );
603 assert_eq!(
604 ids(list_workflows(
605 &conn,
606 custom_filter(SearchAttributePredicate::LessThan {
607 name: String::from("attempts"),
608 value: SearchAttributeValue::Int(3),
609 })
610 )
611 .await?),
612 vec![first.workflow_id.clone()]
613 );
614 assert_eq!(
615 ids(list_workflows(
616 &conn,
617 custom_filter(SearchAttributePredicate::Contains {
618 name: String::from("tags"),
619 keyword: String::from("west"),
620 })
621 )
622 .await?),
623 vec![second.workflow_id.clone(), first.workflow_id.clone()]
624 );
625
626 Ok(())
627 }
628
629 #[tokio::test]
630 async fn list_empty_filter_orders_by_start_time_desc_and_paginates() -> Result<(), StoreError> {
631 let conn = open_test_connection("pagination").await?;
632 let oldest = seed_record(
633 &conn,
634 "orders",
635 WorkflowStatus::Running,
636 instant(2026, 6, 1, 9, 0, 0)?,
637 None,
638 "cust-1",
639 1,
640 )
641 .await?;
642 let middle = seed_record(
643 &conn,
644 "orders",
645 WorkflowStatus::Running,
646 instant(2026, 6, 2, 9, 0, 0)?,
647 None,
648 "cust-2",
649 2,
650 )
651 .await?;
652 let newest = seed_record(
653 &conn,
654 "orders",
655 WorkflowStatus::Running,
656 instant(2026, 6, 3, 9, 0, 0)?,
657 None,
658 "cust-3",
659 3,
660 )
661 .await?;
662
663 assert_eq!(
664 ids(list_workflows(&conn, ListWorkflowsFilter::default()).await?),
665 vec![
666 newest.workflow_id,
667 middle.workflow_id.clone(),
668 oldest.workflow_id
669 ]
670 );
671 assert_eq!(
672 ids(list_workflows(
673 &conn,
674 ListWorkflowsFilter {
675 limit: Some(1),
676 offset: Some(1),
677 ..ListWorkflowsFilter::default()
678 },
679 )
680 .await?),
681 vec![middle.workflow_id]
682 );
683
684 Ok(())
685 }
686
687 #[tokio::test]
688 async fn count_workflows_counts_total_and_filtered_matches() -> Result<(), StoreError> {
689 let conn = open_test_connection("count").await?;
690 seed_record(
691 &conn,
692 "orders",
693 WorkflowStatus::Completed,
694 instant(2026, 6, 1, 9, 0, 0)?,
695 Some(instant(2026, 6, 1, 10, 0, 0)?),
696 "cust-1",
697 2,
698 )
699 .await?;
700 seed_record(
701 &conn,
702 "orders",
703 WorkflowStatus::Running,
704 instant(2026, 6, 2, 9, 0, 0)?,
705 None,
706 "cust-2",
707 5,
708 )
709 .await?;
710
711 assert_eq!(
712 count_workflows(&conn, ListWorkflowsFilter::default()).await?,
713 2
714 );
715 let filter = ListWorkflowsFilter {
716 status: Some(WorkflowStatus::Completed),
717 ..ListWorkflowsFilter::default()
718 };
719 assert_eq!(count_workflows(&conn, filter.clone()).await?, 1);
720 assert_eq!(
721 usize::try_from(count_workflows(&conn, filter.clone()).await?).unwrap_or(usize::MAX),
722 list_workflows(&conn, filter).await?.len()
723 );
724
725 Ok(())
726 }
727
728 #[tokio::test]
729 async fn libsql_store_implements_visibility_store_trait() -> Result<(), StoreError> {
730 let store = crate::store::LibSqlStore::open(unique_temp_path("trait")).await?;
731 let store: std::sync::Arc<dyn VisibilityStore> = std::sync::Arc::new(store);
732
733 assert_eq!(std::sync::Arc::strong_count(&store), 1);
734 Ok(())
735 }
736
737 #[test]
738 fn timestamp_encoding_round_trips_losslessly() -> Result<(), StoreError> {
739 let timestamp = instant(2026, 6, 3, 2, 30, 0)?;
740
741 assert_eq!(decode_timestamp(&encode_timestamp(timestamp))?, timestamp);
742 Ok(())
743 }
744
745 async fn seed_record(
746 conn: &libsql::Connection,
747 workflow_type: &str,
748 status: WorkflowStatus,
749 start_time: DateTime<Utc>,
750 close_time: Option<DateTime<Utc>>,
751 customer: &str,
752 attempts: i64,
753 ) -> Result<VisibilityRecord, StoreError> {
754 let record = visibility_record(
755 WorkflowId::new_v4(),
756 RunId::new_v4(),
757 workflow_type,
758 status,
759 start_time,
760 close_time,
761 )
762 .with_customer(customer, attempts);
763 record_visibility(conn, record.clone()).await?;
764 Ok(record)
765 }
766
767 trait RecordBuilder {
768 fn with_customer(self, customer: &str, attempts: i64) -> Self;
769 }
770
771 impl RecordBuilder for VisibilityRecord {
772 fn with_customer(mut self, customer: &str, attempts: i64) -> Self {
773 self.search_attributes.insert(
774 String::from("customer"),
775 SearchAttributeValue::String(String::from(customer)),
776 );
777 self.search_attributes.insert(
778 String::from("attempts"),
779 SearchAttributeValue::Int(attempts),
780 );
781 self.search_attributes.insert(
782 String::from("tags"),
783 SearchAttributeValue::KeywordList(vec![String::from("vip"), String::from("west")]),
784 );
785 self
786 }
787 }
788
789 fn visibility_record(
790 workflow_id: WorkflowId,
791 run_id: RunId,
792 workflow_type: &str,
793 status: WorkflowStatus,
794 start_time: DateTime<Utc>,
795 close_time: Option<DateTime<Utc>>,
796 ) -> VisibilityRecord {
797 VisibilityRecord {
798 workflow_id,
799 run_id,
800 workflow_type: String::from(workflow_type),
801 status,
802 start_time,
803 close_time,
804 search_attributes: HashMap::new(),
805 }
806 }
807
808 fn custom_filter(predicate: SearchAttributePredicate) -> ListWorkflowsFilter {
809 ListWorkflowsFilter {
810 search_attributes: vec![predicate],
811 ..ListWorkflowsFilter::default()
812 }
813 }
814
815 fn ids(summaries: Vec<VisibilityWorkflowSummary>) -> Vec<WorkflowId> {
816 summaries
817 .into_iter()
818 .map(|summary| summary.workflow_id)
819 .collect()
820 }
821
822 async fn open_test_connection(name: &str) -> Result<libsql::Connection, StoreError> {
823 let config = LibSqlConfig {
824 mode: LibSqlMode::Embedded {
825 path: unique_temp_path(name),
826 },
827 journal_mode: None,
828 synchronous: None,
829 sync_interval_seconds: None,
830 };
831 let conn = crate::connection::open_connection(&config)
832 .await?
833 .connection;
834 crate::schema::ensure_schema(&conn).await?;
835 Ok(conn)
836 }
837
838 fn instant(
839 year: i32,
840 month: u32,
841 day: u32,
842 hour: u32,
843 minute: u32,
844 second: u32,
845 ) -> Result<DateTime<Utc>, StoreError> {
846 Utc.with_ymd_and_hms(year, month, day, hour, minute, second)
847 .single()
848 .ok_or_else(|| StoreError::Serialization(String::from("invalid test instant")))
849 }
850
851 fn unique_temp_path(name: &str) -> PathBuf {
852 let nanos = SystemTime::now()
853 .duration_since(UNIX_EPOCH)
854 .map_or(0, |duration| duration.as_nanos());
855 std::env::temp_dir().join(format!(
856 "aion-store-libsql-visibility-{name}-{}-{nanos}.db",
857 std::process::id()
858 ))
859 }
860}