Skip to main content

aion_store/
memory.rs

1//! `InMemoryStore` reference implementation and behavioural test suite.
2
3use std::collections::HashMap;
4use std::sync::{Mutex, MutexGuard};
5
6use aion_core::{
7    Event, TimerId, WorkflowFilter, WorkflowId, WorkflowStatus, WorkflowSummary, status_from_events,
8};
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11
12use crate::package::{PackageRecord, PackageRouteRecord, PackageStore};
13use crate::visibility::{ListWorkflowsFilter, VisibilityRecord, VisibilityStore};
14use crate::{
15    ReadableEventStore, RunSummary, StoreError, TimerEntry, WritableEventStore, WriteToken,
16};
17
18/// Correct non-durable [`crate::EventStore`] implementation for tests and backend equivalence.
19#[derive(Debug, Default)]
20pub struct InMemoryStore {
21    state: Mutex<InMemoryState>,
22}
23
24#[async_trait]
25impl VisibilityStore for InMemoryStore {
26    async fn record_visibility(&self, record: VisibilityRecord) -> Result<(), StoreError> {
27        let mut state = self.lock_state()?;
28        state
29            .visibility
30            .insert((record.workflow_id.clone(), record.run_id.clone()), record);
31        Ok(())
32    }
33
34    async fn list_workflows(
35        &self,
36        filter: ListWorkflowsFilter,
37    ) -> Result<Vec<crate::visibility::WorkflowSummary>, StoreError> {
38        let state = self.lock_state()?;
39        let mut summaries = state
40            .visibility
41            .values()
42            .cloned()
43            .map(crate::visibility::WorkflowSummary::from)
44            .filter(|summary| filter.matches(summary))
45            .collect::<Vec<_>>();
46        summaries.sort_by(|left, right| {
47            left.start_time.cmp(&right.start_time).then_with(|| {
48                left.workflow_id
49                    .to_string()
50                    .cmp(&right.workflow_id.to_string())
51            })
52        });
53        let offset = filter.offset.and_then(|value| usize::try_from(value).ok());
54        if let Some(offset) = offset {
55            summaries = summaries.into_iter().skip(offset).collect();
56        }
57        if let Some(limit) = filter.limit.and_then(|value| usize::try_from(value).ok()) {
58            summaries.truncate(limit);
59        }
60        Ok(summaries)
61    }
62
63    async fn count_workflows(&self, filter: ListWorkflowsFilter) -> Result<u64, StoreError> {
64        let state = self.lock_state()?;
65        Ok(state
66            .visibility
67            .values()
68            .cloned()
69            .map(crate::visibility::WorkflowSummary::from)
70            .filter(|summary| filter.matches(summary))
71            .count()
72            .try_into()
73            .unwrap_or(u64::MAX))
74    }
75}
76
77#[derive(Debug, Default)]
78struct InMemoryState {
79    histories: HashMap<WorkflowId, Vec<Event>>,
80    timers: HashMap<(WorkflowId, TimerId), TimerEntry>,
81    visibility: HashMap<(WorkflowId, aion_core::RunId), VisibilityRecord>,
82    packages: HashMap<(String, String), PackageRecord>,
83    package_routes: HashMap<String, String>,
84}
85
86impl InMemoryStore {
87    fn lock_state(&self) -> Result<MutexGuard<'_, InMemoryState>, StoreError> {
88        self.state
89            .lock()
90            .map_err(|error| StoreError::Backend(format!("in-memory store lock poisoned: {error}")))
91    }
92}
93
94fn history_head(history: &[Event]) -> u64 {
95    history.iter().map(Event::seq).max().unwrap_or_default()
96}
97
98fn history_in_sequence_order(history: &[Event]) -> Vec<Event> {
99    let mut ordered = history.to_vec();
100    ordered.sort_by_key(Event::seq);
101    ordered
102}
103
104#[async_trait]
105impl PackageStore for InMemoryStore {
106    async fn put_package(&self, record: PackageRecord) -> Result<(), StoreError> {
107        let mut state = self.lock_state()?;
108        state
109            .package_routes
110            .insert(record.workflow_type.clone(), record.content_hash.clone());
111        state.packages.insert(
112            (record.workflow_type.clone(), record.content_hash.clone()),
113            record,
114        );
115        Ok(())
116    }
117
118    async fn list_packages(&self) -> Result<Vec<PackageRecord>, StoreError> {
119        let state = self.lock_state()?;
120        let mut records: Vec<PackageRecord> = state.packages.values().cloned().collect();
121        records.sort_by(|left, right| {
122            left.deployed_at
123                .cmp(&right.deployed_at)
124                .then_with(|| left.workflow_type.cmp(&right.workflow_type))
125                .then_with(|| left.content_hash.cmp(&right.content_hash))
126        });
127        Ok(records)
128    }
129
130    async fn delete_package(
131        &self,
132        workflow_type: &str,
133        content_hash: &str,
134    ) -> Result<(), StoreError> {
135        let mut state = self.lock_state()?;
136        state
137            .packages
138            .remove(&(workflow_type.to_owned(), content_hash.to_owned()));
139        Ok(())
140    }
141
142    async fn put_package_route(
143        &self,
144        workflow_type: &str,
145        content_hash: &str,
146    ) -> Result<(), StoreError> {
147        let mut state = self.lock_state()?;
148        state
149            .package_routes
150            .insert(workflow_type.to_owned(), content_hash.to_owned());
151        Ok(())
152    }
153
154    async fn list_package_routes(&self) -> Result<Vec<PackageRouteRecord>, StoreError> {
155        let state = self.lock_state()?;
156        let mut routes: Vec<PackageRouteRecord> = state
157            .package_routes
158            .iter()
159            .map(|(workflow_type, content_hash)| PackageRouteRecord {
160                workflow_type: workflow_type.clone(),
161                content_hash: content_hash.clone(),
162            })
163            .collect();
164        routes.sort_by(|left, right| left.workflow_type.cmp(&right.workflow_type));
165        Ok(routes)
166    }
167}
168
169#[async_trait]
170impl WritableEventStore for InMemoryStore {
171    async fn append(
172        &self,
173        _token: WriteToken,
174        workflow_id: &WorkflowId,
175        events: &[Event],
176        expected_seq: u64,
177    ) -> Result<(), StoreError> {
178        let mut state = self.lock_state()?;
179        let current_head = state
180            .histories
181            .get(workflow_id)
182            .map_or(0, |history| history_head(history));
183
184        if current_head != expected_seq {
185            return Err(StoreError::SequenceConflict {
186                expected: expected_seq,
187                found: current_head,
188            });
189        }
190
191        if events.is_empty() {
192            return Ok(());
193        }
194
195        let mut next_seq = expected_seq + 1;
196        for event in events {
197            if event.seq() != next_seq {
198                return Err(StoreError::Backend(format!(
199                    "event sequence must be contiguous: expected {next_seq}, got {}",
200                    event.seq()
201                )));
202            }
203            next_seq += 1;
204        }
205
206        state
207            .histories
208            .entry(workflow_id.clone())
209            .or_default()
210            .extend(events.iter().cloned());
211        Ok(())
212    }
213}
214
215#[async_trait]
216impl ReadableEventStore for InMemoryStore {
217    async fn read_history(&self, workflow_id: &WorkflowId) -> Result<Vec<Event>, StoreError> {
218        let state = self.lock_state()?;
219        Ok(state
220            .histories
221            .get(workflow_id)
222            .map_or_else(Vec::new, |history| history_in_sequence_order(history)))
223    }
224
225    async fn read_history_from(
226        &self,
227        workflow_id: &WorkflowId,
228        from_seq: u64,
229    ) -> Result<Vec<Event>, StoreError> {
230        let state = self.lock_state()?;
231        Ok(state
232            .histories
233            .get(workflow_id)
234            .map_or_else(Vec::new, |history| {
235                let mut events = history
236                    .iter()
237                    .filter(|event| event.seq() >= from_seq)
238                    .cloned()
239                    .collect::<Vec<_>>();
240                events.sort_by_key(Event::seq);
241                events
242            }))
243    }
244
245    async fn read_run_chain(
246        &self,
247        workflow_id: &WorkflowId,
248    ) -> Result<Vec<RunSummary>, StoreError> {
249        let state = self.lock_state()?;
250        let Some(history) = state.histories.get(workflow_id) else {
251            return Ok(Vec::new());
252        };
253
254        crate::run_chain::run_chain_from_history(history)
255    }
256
257    async fn list_workflow_ids(&self) -> Result<Vec<WorkflowId>, StoreError> {
258        let state = self.lock_state()?;
259        let mut workflow_ids = state.histories.keys().cloned().collect::<Vec<_>>();
260        workflow_ids.sort_by_key(ToString::to_string);
261        Ok(workflow_ids)
262    }
263
264    async fn list_active(&self) -> Result<Vec<WorkflowId>, StoreError> {
265        let state = self.lock_state()?;
266        let mut active = state
267            .histories
268            .iter()
269            .filter(|(_, history)| {
270                matches!(
271                    status_from_events(&history_in_sequence_order(history)),
272                    WorkflowStatus::Running
273                )
274            })
275            .map(|(workflow_id, _)| workflow_id.clone())
276            .collect::<Vec<_>>();
277        active.sort_by_key(ToString::to_string);
278        Ok(active)
279    }
280
281    async fn query(&self, filter: &WorkflowFilter) -> Result<Vec<WorkflowSummary>, StoreError> {
282        let state = self.lock_state()?;
283        let mut summaries = state
284            .histories
285            .values()
286            .filter_map(|history| {
287                WorkflowSummary::from_history(&history_in_sequence_order(history))
288            })
289            .filter(|summary| filter.matches(summary))
290            .collect::<Vec<_>>();
291        summaries.sort_by(|left, right| {
292            left.started_at.cmp(&right.started_at).then_with(|| {
293                left.workflow_id
294                    .to_string()
295                    .cmp(&right.workflow_id.to_string())
296            })
297        });
298        Ok(summaries)
299    }
300
301    async fn schedule_timer(
302        &self,
303        workflow_id: &WorkflowId,
304        timer_id: &TimerId,
305        fire_at: DateTime<Utc>,
306    ) -> Result<(), StoreError> {
307        let mut state = self.lock_state()?;
308        state.timers.insert(
309            (workflow_id.clone(), timer_id.clone()),
310            TimerEntry {
311                workflow_id: workflow_id.clone(),
312                timer_id: timer_id.clone(),
313                fire_at,
314            },
315        );
316        Ok(())
317    }
318
319    async fn expired_timers(&self, as_of: DateTime<Utc>) -> Result<Vec<TimerEntry>, StoreError> {
320        let state = self.lock_state()?;
321        let mut timers = state
322            .timers
323            .values()
324            .filter(|entry| entry.fire_at <= as_of)
325            .cloned()
326            .collect::<Vec<_>>();
327        timers.sort_by(|left, right| {
328            left.fire_at
329                .cmp(&right.fire_at)
330                .then_with(|| {
331                    left.workflow_id
332                        .to_string()
333                        .cmp(&right.workflow_id.to_string())
334                })
335                .then_with(|| left.timer_id.to_string().cmp(&right.timer_id.to_string()))
336        });
337        Ok(timers)
338    }
339}
340
341#[cfg(test)]
342mod tests {
343    use std::sync::Arc;
344
345    use aion_core::{
346        Event, EventEnvelope, Payload, TimerId, WorkflowError, WorkflowFilter, WorkflowId,
347        WorkflowStatus,
348    };
349    use chrono::{DateTime, Utc};
350    use serde_json::json;
351    use tokio::task;
352    use uuid::Uuid;
353
354    use super::InMemoryStore;
355    use crate::{ReadableEventStore, StoreError, TimerEntry, WritableEventStore, WriteToken};
356
357    fn write_token() -> WriteToken {
358        WriteToken::recorder()
359    }
360
361    fn recorded_at(offset_seconds: i64) -> DateTime<Utc> {
362        DateTime::from_timestamp(1_700_000_000 + offset_seconds, 0).unwrap_or_default()
363    }
364
365    fn workflow_id(value: u128) -> WorkflowId {
366        WorkflowId::new(Uuid::from_u128(value))
367    }
368
369    fn envelope(seq: u64, workflow_id: &WorkflowId) -> EventEnvelope {
370        EventEnvelope {
371            seq,
372            recorded_at: recorded_at(i64::try_from(seq).unwrap_or_default()),
373            workflow_id: workflow_id.clone(),
374        }
375    }
376
377    fn run_id(value: u128) -> aion_core::RunId {
378        aion_core::RunId::new(Uuid::from_u128(value))
379    }
380
381    fn payload(label: &str) -> Payload {
382        Payload::from_json(&json!({ "label": label })).unwrap_or_else(|error| {
383            Payload::new(
384                aion_core::ContentType::Json,
385                format!("{{\"payload_error\":\"{error}\"}}").into_bytes(),
386            )
387        })
388    }
389
390    fn workflow_started(seq: u64, workflow_id: &WorkflowId, workflow_type: &str) -> Event {
391        Event::WorkflowStarted {
392            envelope: envelope(seq, workflow_id),
393            workflow_type: workflow_type.to_owned(),
394            input: payload("input"),
395            run_id: aion_core::RunId::new(uuid::Uuid::from_u128(1)),
396            parent_run_id: None,
397            package_version: aion_core::PackageVersion::new("a".repeat(64)),
398        }
399    }
400
401    fn workflow_completed(seq: u64, workflow_id: &WorkflowId) -> Event {
402        Event::WorkflowCompleted {
403            envelope: envelope(seq, workflow_id),
404            result: payload("result"),
405        }
406    }
407
408    fn workflow_failed(seq: u64, workflow_id: &WorkflowId) -> Event {
409        Event::WorkflowFailed {
410            envelope: envelope(seq, workflow_id),
411            error: WorkflowError {
412                message: String::from("failed"),
413                details: None,
414            },
415        }
416    }
417
418    #[tokio::test]
419    async fn read_history_returns_empty_for_unknown_workflow() -> Result<(), StoreError> {
420        let store = InMemoryStore::default();
421
422        assert_eq!(store.read_history(&workflow_id(1)).await?, Vec::new());
423        Ok(())
424    }
425
426    #[tokio::test]
427    async fn append_preserves_sequence_order() -> Result<(), StoreError> {
428        let store = InMemoryStore::default();
429        let workflow_id = workflow_id(1);
430        let first = workflow_started(1, &workflow_id, "checkout");
431        let second = workflow_completed(2, &workflow_id);
432
433        store
434            .append(write_token(), &workflow_id, std::slice::from_ref(&first), 0)
435            .await?;
436        store
437            .append(
438                write_token(),
439                &workflow_id,
440                std::slice::from_ref(&second),
441                1,
442            )
443            .await?;
444
445        assert_eq!(store.read_history(&workflow_id).await?, vec![first, second]);
446        Ok(())
447    }
448
449    #[tokio::test]
450    async fn list_active_returns_only_running_workflows() -> Result<(), StoreError> {
451        let store = InMemoryStore::default();
452        let running = workflow_id(1);
453        let completed = workflow_id(2);
454
455        store
456            .append(
457                write_token(),
458                &running,
459                &[workflow_started(1, &running, "checkout")],
460                0,
461            )
462            .await?;
463        store
464            .append(
465                write_token(),
466                &completed,
467                &[
468                    workflow_started(1, &completed, "checkout"),
469                    workflow_completed(2, &completed),
470                ],
471                0,
472            )
473            .await?;
474
475        assert_eq!(store.list_active().await?, vec![running]);
476        Ok(())
477    }
478
479    #[tokio::test]
480    async fn list_workflow_ids_returns_running_and_terminal_histories() -> Result<(), StoreError> {
481        let store = InMemoryStore::default();
482        let running = workflow_id(2);
483        let completed = workflow_id(1);
484
485        store
486            .append(
487                write_token(),
488                &running,
489                &[workflow_started(1, &running, "checkout")],
490                0,
491            )
492            .await?;
493        store
494            .append(
495                write_token(),
496                &completed,
497                &[
498                    workflow_started(1, &completed, "checkout"),
499                    workflow_completed(2, &completed),
500                ],
501                0,
502            )
503            .await?;
504
505        assert_eq!(store.list_workflow_ids().await?, vec![completed, running]);
506        Ok(())
507    }
508
509    #[tokio::test]
510    async fn read_run_chain_projects_run_id_from_started_event() -> Result<(), StoreError> {
511        let store = InMemoryStore::default();
512        let workflow_id = workflow_id(1);
513
514        store
515            .append(
516                write_token(),
517                &workflow_id,
518                &[
519                    workflow_started(1, &workflow_id, "checkout"),
520                    workflow_completed(2, &workflow_id),
521                ],
522                0,
523            )
524            .await?;
525
526        let chain = store.read_run_chain(&workflow_id).await?;
527
528        assert_eq!(chain.len(), 1);
529        // run_id comes from the WorkflowStarted event (hardcoded to from_u128(1) in the helper)
530        assert_eq!(chain[0].run_id, run_id(1));
531        assert_eq!(chain[0].status, WorkflowStatus::Completed);
532        assert_eq!(chain[0].closed_at, Some(recorded_at(2)));
533        Ok(())
534    }
535
536    #[tokio::test]
537    async fn query_uses_core_filter_semantics() -> Result<(), StoreError> {
538        let store = InMemoryStore::default();
539        let running_checkout = workflow_id(1);
540        let completed_checkout = workflow_id(2);
541        let failed_billing = workflow_id(3);
542
543        store
544            .append(
545                write_token(),
546                &running_checkout,
547                &[workflow_started(1, &running_checkout, "checkout")],
548                0,
549            )
550            .await?;
551        store
552            .append(
553                write_token(),
554                &completed_checkout,
555                &[
556                    workflow_started(1, &completed_checkout, "checkout"),
557                    workflow_completed(2, &completed_checkout),
558                ],
559                0,
560            )
561            .await?;
562        store
563            .append(
564                write_token(),
565                &failed_billing,
566                &[
567                    workflow_started(1, &failed_billing, "billing"),
568                    workflow_failed(2, &failed_billing),
569                ],
570                0,
571            )
572            .await?;
573
574        let filter = WorkflowFilter {
575            workflow_type: Some(String::from("checkout")),
576            status: Some(WorkflowStatus::Completed),
577            started_after: Some(recorded_at(1)),
578            started_before: Some(recorded_at(1)),
579            parent: None,
580        };
581        let summaries = store.query(&filter).await?;
582
583        assert_eq!(summaries.len(), 1);
584        assert_eq!(summaries[0].workflow_id, completed_checkout);
585        assert_eq!(summaries[0].status, WorkflowStatus::Completed);
586        Ok(())
587    }
588
589    #[tokio::test]
590    async fn stale_expected_sequence_writes_nothing() -> Result<(), StoreError> {
591        let store = InMemoryStore::default();
592        let workflow_id = workflow_id(1);
593        let first = workflow_started(1, &workflow_id, "checkout");
594
595        store
596            .append(write_token(), &workflow_id, std::slice::from_ref(&first), 0)
597            .await?;
598        let conflict = store
599            .append(
600                write_token(),
601                &workflow_id,
602                &[workflow_completed(2, &workflow_id)],
603                0,
604            )
605            .await;
606
607        assert_eq!(
608            conflict,
609            Err(StoreError::SequenceConflict {
610                expected: 0,
611                found: 1,
612            })
613        );
614        assert_eq!(store.read_history(&workflow_id).await?, vec![first]);
615        Ok(())
616    }
617
618    #[tokio::test]
619    async fn append_rejects_non_contiguous_event_sequences() -> Result<(), StoreError> {
620        let store = InMemoryStore::default();
621        let wf = workflow_id(1);
622
623        let result = store
624            .append(
625                write_token(),
626                &wf,
627                &[
628                    workflow_started(1, &wf, "checkout"),
629                    workflow_completed(5, &wf),
630                ],
631                0,
632            )
633            .await;
634
635        assert!(result.is_err());
636        assert!(matches!(result, Err(StoreError::Backend(_))));
637        assert_eq!(store.read_history(&wf).await?, Vec::new());
638        Ok(())
639    }
640
641    #[tokio::test]
642    async fn concurrent_appends_on_same_expected_sequence_conflict_once() -> Result<(), StoreError>
643    {
644        let store = Arc::new(InMemoryStore::default());
645        let workflow_id = workflow_id(1);
646        let first_store = Arc::clone(&store);
647        let first_workflow = workflow_id.clone();
648        let second_store = Arc::clone(&store);
649        let second_workflow = workflow_id.clone();
650
651        let first = task::spawn(async move {
652            first_store
653                .append(
654                    write_token(),
655                    &first_workflow,
656                    &[workflow_started(1, &first_workflow, "checkout")],
657                    0,
658                )
659                .await
660        });
661        let second = task::spawn(async move {
662            second_store
663                .append(
664                    write_token(),
665                    &second_workflow,
666                    &[workflow_completed(1, &second_workflow)],
667                    0,
668                )
669                .await
670        });
671
672        let results = [
673            first
674                .await
675                .map_err(|error| StoreError::Backend(format!("append task failed: {error}")))?,
676            second
677                .await
678                .map_err(|error| StoreError::Backend(format!("append task failed: {error}")))?,
679        ];
680
681        assert_eq!(results.iter().filter(|result| result.is_ok()).count(), 1);
682        assert_eq!(
683            results
684                .iter()
685                .filter(|result| matches!(
686                    result,
687                    Err(StoreError::SequenceConflict {
688                        expected: 0,
689                        found: 1
690                    })
691                ))
692                .count(),
693            1
694        );
695        assert_eq!(store.read_history(&workflow_id).await?.len(), 1);
696        Ok(())
697    }
698
699    #[tokio::test]
700    async fn rescheduling_same_timer_replaces_prior_fire_at() -> Result<(), StoreError> {
701        let store = InMemoryStore::default();
702        let workflow_id = workflow_id(1);
703        let timer_id = TimerId::anonymous(1);
704        let first_fire_at = recorded_at(10);
705        let replacement_fire_at = recorded_at(30);
706
707        store
708            .schedule_timer(&workflow_id, &timer_id, first_fire_at)
709            .await?;
710        store
711            .schedule_timer(&workflow_id, &timer_id, replacement_fire_at)
712            .await?;
713
714        assert_eq!(store.expired_timers(first_fire_at).await?, Vec::new());
715        assert_eq!(
716            store.expired_timers(replacement_fire_at).await?,
717            vec![TimerEntry {
718                workflow_id,
719                timer_id,
720                fire_at: replacement_fire_at,
721            }]
722        );
723        Ok(())
724    }
725
726    #[tokio::test]
727    async fn expired_timers_include_boundary_and_exclude_future() -> Result<(), StoreError> {
728        let store = InMemoryStore::default();
729        let workflow_id = workflow_id(1);
730        let past_timer = TimerId::anonymous(1);
731        let boundary_timer = TimerId::anonymous(2);
732        let future_timer = TimerId::anonymous(3);
733        let as_of = recorded_at(20);
734
735        store
736            .schedule_timer(&workflow_id, &future_timer, recorded_at(30))
737            .await?;
738        store
739            .schedule_timer(&workflow_id, &boundary_timer, as_of)
740            .await?;
741        store
742            .schedule_timer(&workflow_id, &past_timer, recorded_at(10))
743            .await?;
744
745        assert_eq!(
746            store.expired_timers(as_of).await?,
747            vec![
748                TimerEntry {
749                    workflow_id: workflow_id.clone(),
750                    timer_id: past_timer,
751                    fire_at: recorded_at(10),
752                },
753                TimerEntry {
754                    workflow_id,
755                    timer_id: boundary_timer,
756                    fire_at: as_of,
757                },
758            ]
759        );
760        Ok(())
761    }
762}