1use std::collections::HashMap;
4use std::sync::{Mutex, MutexGuard};
5
6use aion_core::{
7 Event, TimerId, WorkflowFilter, WorkflowId, WorkflowStatus, WorkflowSummary, status_from_events,
8};
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11
12use crate::package::{PackageRecord, PackageRouteRecord, PackageStore};
13use crate::visibility::{ListWorkflowsFilter, VisibilityRecord, VisibilityStore};
14use crate::{
15 ReadableEventStore, RunSummary, StoreError, TimerEntry, WritableEventStore, WriteToken,
16};
17
18#[derive(Debug, Default)]
20pub struct InMemoryStore {
21 state: Mutex<InMemoryState>,
22}
23
24#[async_trait]
25impl VisibilityStore for InMemoryStore {
26 async fn record_visibility(&self, record: VisibilityRecord) -> Result<(), StoreError> {
27 let mut state = self.lock_state()?;
28 state
29 .visibility
30 .insert((record.workflow_id.clone(), record.run_id.clone()), record);
31 Ok(())
32 }
33
34 async fn list_workflows(
35 &self,
36 filter: ListWorkflowsFilter,
37 ) -> Result<Vec<crate::visibility::WorkflowSummary>, StoreError> {
38 let state = self.lock_state()?;
39 let mut summaries = state
40 .visibility
41 .values()
42 .cloned()
43 .map(crate::visibility::WorkflowSummary::from)
44 .filter(|summary| filter.matches(summary))
45 .collect::<Vec<_>>();
46 summaries.sort_by(|left, right| {
47 left.start_time.cmp(&right.start_time).then_with(|| {
48 left.workflow_id
49 .to_string()
50 .cmp(&right.workflow_id.to_string())
51 })
52 });
53 let offset = filter.offset.and_then(|value| usize::try_from(value).ok());
54 if let Some(offset) = offset {
55 summaries = summaries.into_iter().skip(offset).collect();
56 }
57 if let Some(limit) = filter.limit.and_then(|value| usize::try_from(value).ok()) {
58 summaries.truncate(limit);
59 }
60 Ok(summaries)
61 }
62
63 async fn count_workflows(&self, filter: ListWorkflowsFilter) -> Result<u64, StoreError> {
64 let state = self.lock_state()?;
65 Ok(state
66 .visibility
67 .values()
68 .cloned()
69 .map(crate::visibility::WorkflowSummary::from)
70 .filter(|summary| filter.matches(summary))
71 .count()
72 .try_into()
73 .unwrap_or(u64::MAX))
74 }
75}
76
77#[derive(Debug, Default)]
78struct InMemoryState {
79 histories: HashMap<WorkflowId, Vec<Event>>,
80 timers: HashMap<(WorkflowId, TimerId), TimerEntry>,
81 visibility: HashMap<(WorkflowId, aion_core::RunId), VisibilityRecord>,
82 packages: HashMap<(String, String), PackageRecord>,
83 package_routes: HashMap<String, String>,
84}
85
86impl InMemoryStore {
87 fn lock_state(&self) -> Result<MutexGuard<'_, InMemoryState>, StoreError> {
88 self.state
89 .lock()
90 .map_err(|error| StoreError::Backend(format!("in-memory store lock poisoned: {error}")))
91 }
92}
93
94fn history_head(history: &[Event]) -> u64 {
95 history.iter().map(Event::seq).max().unwrap_or_default()
96}
97
98fn history_in_sequence_order(history: &[Event]) -> Vec<Event> {
99 let mut ordered = history.to_vec();
100 ordered.sort_by_key(Event::seq);
101 ordered
102}
103
104#[async_trait]
105impl PackageStore for InMemoryStore {
106 async fn put_package(&self, record: PackageRecord) -> Result<(), StoreError> {
107 let mut state = self.lock_state()?;
108 state
109 .package_routes
110 .insert(record.workflow_type.clone(), record.content_hash.clone());
111 state.packages.insert(
112 (record.workflow_type.clone(), record.content_hash.clone()),
113 record,
114 );
115 Ok(())
116 }
117
118 async fn list_packages(&self) -> Result<Vec<PackageRecord>, StoreError> {
119 let state = self.lock_state()?;
120 let mut records: Vec<PackageRecord> = state.packages.values().cloned().collect();
121 records.sort_by(|left, right| {
122 left.deployed_at
123 .cmp(&right.deployed_at)
124 .then_with(|| left.workflow_type.cmp(&right.workflow_type))
125 .then_with(|| left.content_hash.cmp(&right.content_hash))
126 });
127 Ok(records)
128 }
129
130 async fn delete_package(
131 &self,
132 workflow_type: &str,
133 content_hash: &str,
134 ) -> Result<(), StoreError> {
135 let mut state = self.lock_state()?;
136 state
137 .packages
138 .remove(&(workflow_type.to_owned(), content_hash.to_owned()));
139 Ok(())
140 }
141
142 async fn put_package_route(
143 &self,
144 workflow_type: &str,
145 content_hash: &str,
146 ) -> Result<(), StoreError> {
147 let mut state = self.lock_state()?;
148 state
149 .package_routes
150 .insert(workflow_type.to_owned(), content_hash.to_owned());
151 Ok(())
152 }
153
154 async fn list_package_routes(&self) -> Result<Vec<PackageRouteRecord>, StoreError> {
155 let state = self.lock_state()?;
156 let mut routes: Vec<PackageRouteRecord> = state
157 .package_routes
158 .iter()
159 .map(|(workflow_type, content_hash)| PackageRouteRecord {
160 workflow_type: workflow_type.clone(),
161 content_hash: content_hash.clone(),
162 })
163 .collect();
164 routes.sort_by(|left, right| left.workflow_type.cmp(&right.workflow_type));
165 Ok(routes)
166 }
167}
168
169#[async_trait]
170impl WritableEventStore for InMemoryStore {
171 async fn append(
172 &self,
173 _token: WriteToken,
174 workflow_id: &WorkflowId,
175 events: &[Event],
176 expected_seq: u64,
177 ) -> Result<(), StoreError> {
178 let mut state = self.lock_state()?;
179 let current_head = state
180 .histories
181 .get(workflow_id)
182 .map_or(0, |history| history_head(history));
183
184 if current_head != expected_seq {
185 return Err(StoreError::SequenceConflict {
186 expected: expected_seq,
187 found: current_head,
188 });
189 }
190
191 if events.is_empty() {
192 return Ok(());
193 }
194
195 let mut next_seq = expected_seq + 1;
196 for event in events {
197 if event.seq() != next_seq {
198 return Err(StoreError::Backend(format!(
199 "event sequence must be contiguous: expected {next_seq}, got {}",
200 event.seq()
201 )));
202 }
203 next_seq += 1;
204 }
205
206 state
207 .histories
208 .entry(workflow_id.clone())
209 .or_default()
210 .extend(events.iter().cloned());
211 Ok(())
212 }
213}
214
215#[async_trait]
216impl ReadableEventStore for InMemoryStore {
217 async fn read_history(&self, workflow_id: &WorkflowId) -> Result<Vec<Event>, StoreError> {
218 let state = self.lock_state()?;
219 Ok(state
220 .histories
221 .get(workflow_id)
222 .map_or_else(Vec::new, |history| history_in_sequence_order(history)))
223 }
224
225 async fn read_history_from(
226 &self,
227 workflow_id: &WorkflowId,
228 from_seq: u64,
229 ) -> Result<Vec<Event>, StoreError> {
230 let state = self.lock_state()?;
231 Ok(state
232 .histories
233 .get(workflow_id)
234 .map_or_else(Vec::new, |history| {
235 let mut events = history
236 .iter()
237 .filter(|event| event.seq() >= from_seq)
238 .cloned()
239 .collect::<Vec<_>>();
240 events.sort_by_key(Event::seq);
241 events
242 }))
243 }
244
245 async fn read_run_chain(
246 &self,
247 workflow_id: &WorkflowId,
248 ) -> Result<Vec<RunSummary>, StoreError> {
249 let state = self.lock_state()?;
250 let Some(history) = state.histories.get(workflow_id) else {
251 return Ok(Vec::new());
252 };
253
254 crate::run_chain::run_chain_from_history(history)
255 }
256
257 async fn list_workflow_ids(&self) -> Result<Vec<WorkflowId>, StoreError> {
258 let state = self.lock_state()?;
259 let mut workflow_ids = state.histories.keys().cloned().collect::<Vec<_>>();
260 workflow_ids.sort_by_key(ToString::to_string);
261 Ok(workflow_ids)
262 }
263
264 async fn list_active(&self) -> Result<Vec<WorkflowId>, StoreError> {
265 let state = self.lock_state()?;
266 let mut active = state
267 .histories
268 .iter()
269 .filter(|(_, history)| {
270 matches!(
271 status_from_events(&history_in_sequence_order(history)),
272 WorkflowStatus::Running
273 )
274 })
275 .map(|(workflow_id, _)| workflow_id.clone())
276 .collect::<Vec<_>>();
277 active.sort_by_key(ToString::to_string);
278 Ok(active)
279 }
280
281 async fn query(&self, filter: &WorkflowFilter) -> Result<Vec<WorkflowSummary>, StoreError> {
282 let state = self.lock_state()?;
283 let mut summaries = state
284 .histories
285 .values()
286 .filter_map(|history| {
287 WorkflowSummary::from_history(&history_in_sequence_order(history))
288 })
289 .filter(|summary| filter.matches(summary))
290 .collect::<Vec<_>>();
291 summaries.sort_by(|left, right| {
292 left.started_at.cmp(&right.started_at).then_with(|| {
293 left.workflow_id
294 .to_string()
295 .cmp(&right.workflow_id.to_string())
296 })
297 });
298 Ok(summaries)
299 }
300
301 async fn schedule_timer(
302 &self,
303 workflow_id: &WorkflowId,
304 timer_id: &TimerId,
305 fire_at: DateTime<Utc>,
306 ) -> Result<(), StoreError> {
307 let mut state = self.lock_state()?;
308 state.timers.insert(
309 (workflow_id.clone(), timer_id.clone()),
310 TimerEntry {
311 workflow_id: workflow_id.clone(),
312 timer_id: timer_id.clone(),
313 fire_at,
314 },
315 );
316 Ok(())
317 }
318
319 async fn expired_timers(&self, as_of: DateTime<Utc>) -> Result<Vec<TimerEntry>, StoreError> {
320 let state = self.lock_state()?;
321 let mut timers = state
322 .timers
323 .values()
324 .filter(|entry| entry.fire_at <= as_of)
325 .cloned()
326 .collect::<Vec<_>>();
327 timers.sort_by(|left, right| {
328 left.fire_at
329 .cmp(&right.fire_at)
330 .then_with(|| {
331 left.workflow_id
332 .to_string()
333 .cmp(&right.workflow_id.to_string())
334 })
335 .then_with(|| left.timer_id.to_string().cmp(&right.timer_id.to_string()))
336 });
337 Ok(timers)
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use std::sync::Arc;
344
345 use aion_core::{
346 Event, EventEnvelope, Payload, TimerId, WorkflowError, WorkflowFilter, WorkflowId,
347 WorkflowStatus,
348 };
349 use chrono::{DateTime, Utc};
350 use serde_json::json;
351 use tokio::task;
352 use uuid::Uuid;
353
354 use super::InMemoryStore;
355 use crate::{ReadableEventStore, StoreError, TimerEntry, WritableEventStore, WriteToken};
356
357 fn write_token() -> WriteToken {
358 WriteToken::recorder()
359 }
360
361 fn recorded_at(offset_seconds: i64) -> DateTime<Utc> {
362 DateTime::from_timestamp(1_700_000_000 + offset_seconds, 0).unwrap_or_default()
363 }
364
365 fn workflow_id(value: u128) -> WorkflowId {
366 WorkflowId::new(Uuid::from_u128(value))
367 }
368
369 fn envelope(seq: u64, workflow_id: &WorkflowId) -> EventEnvelope {
370 EventEnvelope {
371 seq,
372 recorded_at: recorded_at(i64::try_from(seq).unwrap_or_default()),
373 workflow_id: workflow_id.clone(),
374 }
375 }
376
377 fn run_id(value: u128) -> aion_core::RunId {
378 aion_core::RunId::new(Uuid::from_u128(value))
379 }
380
381 fn payload(label: &str) -> Payload {
382 Payload::from_json(&json!({ "label": label })).unwrap_or_else(|error| {
383 Payload::new(
384 aion_core::ContentType::Json,
385 format!("{{\"payload_error\":\"{error}\"}}").into_bytes(),
386 )
387 })
388 }
389
390 fn workflow_started(seq: u64, workflow_id: &WorkflowId, workflow_type: &str) -> Event {
391 Event::WorkflowStarted {
392 envelope: envelope(seq, workflow_id),
393 workflow_type: workflow_type.to_owned(),
394 input: payload("input"),
395 run_id: aion_core::RunId::new(uuid::Uuid::from_u128(1)),
396 parent_run_id: None,
397 package_version: aion_core::PackageVersion::new("a".repeat(64)),
398 }
399 }
400
401 fn workflow_completed(seq: u64, workflow_id: &WorkflowId) -> Event {
402 Event::WorkflowCompleted {
403 envelope: envelope(seq, workflow_id),
404 result: payload("result"),
405 }
406 }
407
408 fn workflow_failed(seq: u64, workflow_id: &WorkflowId) -> Event {
409 Event::WorkflowFailed {
410 envelope: envelope(seq, workflow_id),
411 error: WorkflowError {
412 message: String::from("failed"),
413 details: None,
414 },
415 }
416 }
417
418 #[tokio::test]
419 async fn read_history_returns_empty_for_unknown_workflow() -> Result<(), StoreError> {
420 let store = InMemoryStore::default();
421
422 assert_eq!(store.read_history(&workflow_id(1)).await?, Vec::new());
423 Ok(())
424 }
425
426 #[tokio::test]
427 async fn append_preserves_sequence_order() -> Result<(), StoreError> {
428 let store = InMemoryStore::default();
429 let workflow_id = workflow_id(1);
430 let first = workflow_started(1, &workflow_id, "checkout");
431 let second = workflow_completed(2, &workflow_id);
432
433 store
434 .append(write_token(), &workflow_id, std::slice::from_ref(&first), 0)
435 .await?;
436 store
437 .append(
438 write_token(),
439 &workflow_id,
440 std::slice::from_ref(&second),
441 1,
442 )
443 .await?;
444
445 assert_eq!(store.read_history(&workflow_id).await?, vec![first, second]);
446 Ok(())
447 }
448
449 #[tokio::test]
450 async fn list_active_returns_only_running_workflows() -> Result<(), StoreError> {
451 let store = InMemoryStore::default();
452 let running = workflow_id(1);
453 let completed = workflow_id(2);
454
455 store
456 .append(
457 write_token(),
458 &running,
459 &[workflow_started(1, &running, "checkout")],
460 0,
461 )
462 .await?;
463 store
464 .append(
465 write_token(),
466 &completed,
467 &[
468 workflow_started(1, &completed, "checkout"),
469 workflow_completed(2, &completed),
470 ],
471 0,
472 )
473 .await?;
474
475 assert_eq!(store.list_active().await?, vec![running]);
476 Ok(())
477 }
478
479 #[tokio::test]
480 async fn list_workflow_ids_returns_running_and_terminal_histories() -> Result<(), StoreError> {
481 let store = InMemoryStore::default();
482 let running = workflow_id(2);
483 let completed = workflow_id(1);
484
485 store
486 .append(
487 write_token(),
488 &running,
489 &[workflow_started(1, &running, "checkout")],
490 0,
491 )
492 .await?;
493 store
494 .append(
495 write_token(),
496 &completed,
497 &[
498 workflow_started(1, &completed, "checkout"),
499 workflow_completed(2, &completed),
500 ],
501 0,
502 )
503 .await?;
504
505 assert_eq!(store.list_workflow_ids().await?, vec![completed, running]);
506 Ok(())
507 }
508
509 #[tokio::test]
510 async fn read_run_chain_projects_run_id_from_started_event() -> Result<(), StoreError> {
511 let store = InMemoryStore::default();
512 let workflow_id = workflow_id(1);
513
514 store
515 .append(
516 write_token(),
517 &workflow_id,
518 &[
519 workflow_started(1, &workflow_id, "checkout"),
520 workflow_completed(2, &workflow_id),
521 ],
522 0,
523 )
524 .await?;
525
526 let chain = store.read_run_chain(&workflow_id).await?;
527
528 assert_eq!(chain.len(), 1);
529 assert_eq!(chain[0].run_id, run_id(1));
531 assert_eq!(chain[0].status, WorkflowStatus::Completed);
532 assert_eq!(chain[0].closed_at, Some(recorded_at(2)));
533 Ok(())
534 }
535
536 #[tokio::test]
537 async fn query_uses_core_filter_semantics() -> Result<(), StoreError> {
538 let store = InMemoryStore::default();
539 let running_checkout = workflow_id(1);
540 let completed_checkout = workflow_id(2);
541 let failed_billing = workflow_id(3);
542
543 store
544 .append(
545 write_token(),
546 &running_checkout,
547 &[workflow_started(1, &running_checkout, "checkout")],
548 0,
549 )
550 .await?;
551 store
552 .append(
553 write_token(),
554 &completed_checkout,
555 &[
556 workflow_started(1, &completed_checkout, "checkout"),
557 workflow_completed(2, &completed_checkout),
558 ],
559 0,
560 )
561 .await?;
562 store
563 .append(
564 write_token(),
565 &failed_billing,
566 &[
567 workflow_started(1, &failed_billing, "billing"),
568 workflow_failed(2, &failed_billing),
569 ],
570 0,
571 )
572 .await?;
573
574 let filter = WorkflowFilter {
575 workflow_type: Some(String::from("checkout")),
576 status: Some(WorkflowStatus::Completed),
577 started_after: Some(recorded_at(1)),
578 started_before: Some(recorded_at(1)),
579 parent: None,
580 };
581 let summaries = store.query(&filter).await?;
582
583 assert_eq!(summaries.len(), 1);
584 assert_eq!(summaries[0].workflow_id, completed_checkout);
585 assert_eq!(summaries[0].status, WorkflowStatus::Completed);
586 Ok(())
587 }
588
589 #[tokio::test]
590 async fn stale_expected_sequence_writes_nothing() -> Result<(), StoreError> {
591 let store = InMemoryStore::default();
592 let workflow_id = workflow_id(1);
593 let first = workflow_started(1, &workflow_id, "checkout");
594
595 store
596 .append(write_token(), &workflow_id, std::slice::from_ref(&first), 0)
597 .await?;
598 let conflict = store
599 .append(
600 write_token(),
601 &workflow_id,
602 &[workflow_completed(2, &workflow_id)],
603 0,
604 )
605 .await;
606
607 assert_eq!(
608 conflict,
609 Err(StoreError::SequenceConflict {
610 expected: 0,
611 found: 1,
612 })
613 );
614 assert_eq!(store.read_history(&workflow_id).await?, vec![first]);
615 Ok(())
616 }
617
618 #[tokio::test]
619 async fn append_rejects_non_contiguous_event_sequences() -> Result<(), StoreError> {
620 let store = InMemoryStore::default();
621 let wf = workflow_id(1);
622
623 let result = store
624 .append(
625 write_token(),
626 &wf,
627 &[
628 workflow_started(1, &wf, "checkout"),
629 workflow_completed(5, &wf),
630 ],
631 0,
632 )
633 .await;
634
635 assert!(result.is_err());
636 assert!(matches!(result, Err(StoreError::Backend(_))));
637 assert_eq!(store.read_history(&wf).await?, Vec::new());
638 Ok(())
639 }
640
641 #[tokio::test]
642 async fn concurrent_appends_on_same_expected_sequence_conflict_once() -> Result<(), StoreError>
643 {
644 let store = Arc::new(InMemoryStore::default());
645 let workflow_id = workflow_id(1);
646 let first_store = Arc::clone(&store);
647 let first_workflow = workflow_id.clone();
648 let second_store = Arc::clone(&store);
649 let second_workflow = workflow_id.clone();
650
651 let first = task::spawn(async move {
652 first_store
653 .append(
654 write_token(),
655 &first_workflow,
656 &[workflow_started(1, &first_workflow, "checkout")],
657 0,
658 )
659 .await
660 });
661 let second = task::spawn(async move {
662 second_store
663 .append(
664 write_token(),
665 &second_workflow,
666 &[workflow_completed(1, &second_workflow)],
667 0,
668 )
669 .await
670 });
671
672 let results = [
673 first
674 .await
675 .map_err(|error| StoreError::Backend(format!("append task failed: {error}")))?,
676 second
677 .await
678 .map_err(|error| StoreError::Backend(format!("append task failed: {error}")))?,
679 ];
680
681 assert_eq!(results.iter().filter(|result| result.is_ok()).count(), 1);
682 assert_eq!(
683 results
684 .iter()
685 .filter(|result| matches!(
686 result,
687 Err(StoreError::SequenceConflict {
688 expected: 0,
689 found: 1
690 })
691 ))
692 .count(),
693 1
694 );
695 assert_eq!(store.read_history(&workflow_id).await?.len(), 1);
696 Ok(())
697 }
698
699 #[tokio::test]
700 async fn rescheduling_same_timer_replaces_prior_fire_at() -> Result<(), StoreError> {
701 let store = InMemoryStore::default();
702 let workflow_id = workflow_id(1);
703 let timer_id = TimerId::anonymous(1);
704 let first_fire_at = recorded_at(10);
705 let replacement_fire_at = recorded_at(30);
706
707 store
708 .schedule_timer(&workflow_id, &timer_id, first_fire_at)
709 .await?;
710 store
711 .schedule_timer(&workflow_id, &timer_id, replacement_fire_at)
712 .await?;
713
714 assert_eq!(store.expired_timers(first_fire_at).await?, Vec::new());
715 assert_eq!(
716 store.expired_timers(replacement_fire_at).await?,
717 vec![TimerEntry {
718 workflow_id,
719 timer_id,
720 fire_at: replacement_fire_at,
721 }]
722 );
723 Ok(())
724 }
725
726 #[tokio::test]
727 async fn expired_timers_include_boundary_and_exclude_future() -> Result<(), StoreError> {
728 let store = InMemoryStore::default();
729 let workflow_id = workflow_id(1);
730 let past_timer = TimerId::anonymous(1);
731 let boundary_timer = TimerId::anonymous(2);
732 let future_timer = TimerId::anonymous(3);
733 let as_of = recorded_at(20);
734
735 store
736 .schedule_timer(&workflow_id, &future_timer, recorded_at(30))
737 .await?;
738 store
739 .schedule_timer(&workflow_id, &boundary_timer, as_of)
740 .await?;
741 store
742 .schedule_timer(&workflow_id, &past_timer, recorded_at(10))
743 .await?;
744
745 assert_eq!(
746 store.expired_timers(as_of).await?,
747 vec![
748 TimerEntry {
749 workflow_id: workflow_id.clone(),
750 timer_id: past_timer,
751 fire_at: recorded_at(10),
752 },
753 TimerEntry {
754 workflow_id,
755 timer_id: boundary_timer,
756 fire_at: as_of,
757 },
758 ]
759 );
760 Ok(())
761 }
762}