use std::collections::HashMap;
use std::sync::Arc;
use aion_core::{
ActivityError, ActivityId, Event, EventEnvelope, PackageVersion, Payload, RunId,
ScheduleConfig, ScheduleId, SearchAttributeSchema, SearchAttributeValue, TimerId,
WithTimeoutOutcome, WorkflowError, WorkflowId,
};
use aion_store::visibility::{VisibilityRecord, VisibilityStore};
use aion_store::{EventStore, WriteToken};
use chrono::{DateTime, Utc};
use crate::durability::{DurabilityError, seq::SequenceHead};
#[derive(Clone, Debug)]
pub struct WorkflowStartRecord {
pub workflow_type: String,
pub input: Payload,
pub run_id: RunId,
pub parent_run_id: Option<RunId>,
pub package_version: PackageVersion,
}
pub struct Recorder {
workflow_id: WorkflowId,
store: Arc<dyn EventStore>,
sequence: SequenceHead,
write_token: WriteToken,
visibility: Option<RecorderVisibility>,
}
struct RecorderVisibility {
run_id: RunId,
store: Arc<dyn VisibilityStore>,
}
impl Recorder {
#[must_use]
pub fn new(workflow_id: WorkflowId, store: Arc<dyn EventStore>) -> Self {
Self::resume_at(workflow_id, store, 0)
}
#[must_use]
pub fn resume_at(workflow_id: WorkflowId, store: Arc<dyn EventStore>, head: u64) -> Self {
Self {
workflow_id,
store,
write_token: WriteToken::recorder(),
sequence: SequenceHead::from_head(head),
visibility: None,
}
}
#[must_use]
pub fn with_visibility(mut self, run_id: RunId, store: Arc<dyn VisibilityStore>) -> Self {
self.visibility = Some(RecorderVisibility { run_id, store });
self
}
#[must_use]
pub const fn workflow_id(&self) -> &WorkflowId {
&self.workflow_id
}
#[must_use]
pub const fn current_head(&self) -> u64 {
self.sequence.current()
}
pub async fn read_history(&self) -> Result<Vec<Event>, DurabilityError> {
self.store
.read_history(&self.workflow_id)
.await
.map_err(Into::into)
}
pub async fn record_workflow_started(
&mut self,
recorded_at: DateTime<Utc>,
start: WorkflowStartRecord,
) -> Result<(), DurabilityError> {
let WorkflowStartRecord {
workflow_type,
input,
run_id,
parent_run_id,
package_version,
} = start;
self.append_with(recorded_at, |envelope| Event::WorkflowStarted {
envelope,
workflow_type,
input,
run_id,
parent_run_id,
package_version,
})
.await
}
pub async fn record_workflow_started_with_attributes(
&mut self,
recorded_at: DateTime<Utc>,
start: WorkflowStartRecord,
attributes: HashMap<String, SearchAttributeValue>,
schema: &SearchAttributeSchema,
) -> Result<(), DurabilityError> {
if attributes.is_empty() {
return self.record_workflow_started(recorded_at, start).await;
}
for (name, value) in &attributes {
schema.validate(name, value)?;
}
let WorkflowStartRecord {
workflow_type,
input,
run_id,
parent_run_id,
package_version,
} = start;
let started_envelope = self.next_envelope(recorded_at)?;
let attributes_envelope = self.envelope_after(&started_envelope, recorded_at)?;
let workflow_id = self.workflow_id.clone();
let batch = [
Event::WorkflowStarted {
envelope: started_envelope,
workflow_type,
input,
run_id,
parent_run_id,
package_version,
},
Event::SearchAttributesUpdated {
envelope: attributes_envelope,
workflow_id,
attributes,
},
];
let expected_seq = self.sequence.current();
self.store
.append(self.write_token, &self.workflow_id, &batch, expected_seq)
.await?;
self.sequence.mark_append_success(batch.len())
}
pub async fn record_schedule_created(
&mut self,
recorded_at: DateTime<Utc>,
schedule_id: ScheduleId,
config: ScheduleConfig,
) -> Result<(), DurabilityError> {
self.append_with(recorded_at, |envelope| Event::ScheduleCreated {
envelope,
schedule_id,
config,
})
.await
}
pub async fn record_schedule_updated(
&mut self,
recorded_at: DateTime<Utc>,
schedule_id: ScheduleId,
config: ScheduleConfig,
) -> Result<(), DurabilityError> {
self.append_with(recorded_at, |envelope| Event::ScheduleUpdated {
envelope,
schedule_id,
config,
})
.await
}
pub async fn record_schedule_paused(
&mut self,
recorded_at: DateTime<Utc>,
schedule_id: ScheduleId,
) -> Result<(), DurabilityError> {
self.append_with(recorded_at, |envelope| Event::SchedulePaused {
envelope,
schedule_id,
})
.await
}
pub async fn record_schedule_resumed(
&mut self,
recorded_at: DateTime<Utc>,
schedule_id: ScheduleId,
) -> Result<(), DurabilityError> {
self.append_with(recorded_at, |envelope| Event::ScheduleResumed {
envelope,
schedule_id,
})
.await
}
pub async fn record_schedule_deleted(
&mut self,
recorded_at: DateTime<Utc>,
schedule_id: ScheduleId,
) -> Result<(), DurabilityError> {
self.append_with(recorded_at, |envelope| Event::ScheduleDeleted {
envelope,
schedule_id,
})
.await
}
pub async fn record_schedule_triggered(
&mut self,
recorded_at: DateTime<Utc>,
schedule_id: ScheduleId,
workflow_id: WorkflowId,
run_id: RunId,
) -> Result<(), DurabilityError> {
self.append_with(recorded_at, |envelope| Event::ScheduleTriggered {
envelope,
schedule_id,
workflow_id,
run_id,
})
.await
}
pub async fn record_workflow_completed(
&mut self,
recorded_at: DateTime<Utc>,
result: Payload,
) -> Result<(), DurabilityError> {
self.append_with(recorded_at, |envelope| Event::WorkflowCompleted {
envelope,
result,
})
.await
}
pub async fn record_workflow_failed(
&mut self,
recorded_at: DateTime<Utc>,
error: WorkflowError,
) -> Result<(), DurabilityError> {
self.append_with(recorded_at, |envelope| Event::WorkflowFailed {
envelope,
error,
})
.await
}
pub async fn record_workflow_cancelled(
&mut self,
recorded_at: DateTime<Utc>,
reason: String,
) -> Result<(), DurabilityError> {
self.append_with(recorded_at, |envelope| Event::WorkflowCancelled {
envelope,
reason,
})
.await
}
pub async fn record_workflow_continued_as_new(
&mut self,
recorded_at: DateTime<Utc>,
input: Payload,
workflow_type: Option<String>,
parent_run_id: RunId,
) -> Result<(), DurabilityError> {
self.append_with(recorded_at, |envelope| Event::WorkflowContinuedAsNew {
envelope,
input,
workflow_type,
parent_run_id,
})
.await?;
self.upsert_visibility_projection_nonfatal().await;
Ok(())
}
pub async fn record_search_attributes_updated(
&mut self,
recorded_at: DateTime<Utc>,
attributes: HashMap<String, SearchAttributeValue>,
schema: &SearchAttributeSchema,
) -> Result<(), DurabilityError> {
for (name, value) in &attributes {
schema.validate(name, value)?;
}
let workflow_id = self.workflow_id.clone();
self.append_with(recorded_at, |envelope| Event::SearchAttributesUpdated {
envelope,
workflow_id,
attributes,
})
.await?;
self.upsert_visibility_projection_nonfatal().await;
Ok(())
}
pub async fn record_activity_scheduled(
&mut self,
recorded_at: DateTime<Utc>,
activity_id: ActivityId,
activity_type: String,
input: Payload,
) -> Result<(), DurabilityError> {
self.append_with(recorded_at, |envelope| Event::ActivityScheduled {
envelope,
activity_id,
activity_type,
input,
})
.await
}
pub async fn record_activity_started(
&mut self,
recorded_at: DateTime<Utc>,
activity_id: ActivityId,
) -> Result<(), DurabilityError> {
self.append_with(recorded_at, |envelope| Event::ActivityStarted {
envelope,
activity_id,
})
.await
}
pub async fn record_activity_completed(
&mut self,
recorded_at: DateTime<Utc>,
activity_id: ActivityId,
result: Payload,
) -> Result<(), DurabilityError> {
self.append_with(recorded_at, |envelope| Event::ActivityCompleted {
envelope,
activity_id,
result,
})
.await
}
pub async fn record_activity_failed(
&mut self,
recorded_at: DateTime<Utc>,
activity_id: ActivityId,
error: ActivityError,
attempt: u32,
) -> Result<(), DurabilityError> {
self.append_with(recorded_at, |envelope| Event::ActivityFailed {
envelope,
activity_id,
error,
attempt,
})
.await
}
pub async fn record_activity_cancelled(
&mut self,
recorded_at: DateTime<Utc>,
activity_id: ActivityId,
) -> Result<(), DurabilityError> {
self.append_with(recorded_at, |envelope| Event::ActivityCancelled {
envelope,
activity_id,
})
.await
}
pub async fn record_timer_started(
&mut self,
recorded_at: DateTime<Utc>,
timer_id: TimerId,
fire_at: DateTime<Utc>,
) -> Result<(), DurabilityError> {
self.append_with(recorded_at, |envelope| Event::TimerStarted {
envelope,
timer_id,
fire_at,
})
.await
}
pub async fn record_timer_fired(
&mut self,
recorded_at: DateTime<Utc>,
timer_id: TimerId,
) -> Result<(), DurabilityError> {
self.append_with(recorded_at, |envelope| Event::TimerFired {
envelope,
timer_id,
})
.await
}
pub async fn record_timer_cancelled(
&mut self,
recorded_at: DateTime<Utc>,
timer_id: TimerId,
) -> Result<(), DurabilityError> {
self.append_with(recorded_at, |envelope| Event::TimerCancelled {
envelope,
timer_id,
})
.await
}
pub async fn record_with_timeout_completed(
&mut self,
recorded_at: DateTime<Utc>,
timer_id: TimerId,
outcome: WithTimeoutOutcome,
result: Option<Payload>,
) -> Result<(), DurabilityError> {
self.append_with(recorded_at, |envelope| Event::WithTimeoutCompleted {
envelope,
timer_id,
outcome,
result,
})
.await
}
pub async fn record_signal_received(
&mut self,
recorded_at: DateTime<Utc>,
name: String,
payload: Payload,
) -> Result<(), DurabilityError> {
self.append_with(recorded_at, |envelope| Event::SignalReceived {
envelope,
name,
payload,
})
.await
}
pub async fn record_signal_sent(
&mut self,
recorded_at: DateTime<Utc>,
target_workflow_id: WorkflowId,
name: String,
payload: Payload,
) -> Result<(), DurabilityError> {
self.append_with(recorded_at, |envelope| Event::SignalSent {
envelope,
target_workflow_id,
name,
payload,
})
.await
}
pub async fn record_child_workflow_started(
&mut self,
recorded_at: DateTime<Utc>,
child_workflow_id: WorkflowId,
workflow_type: String,
input: Payload,
package_version: PackageVersion,
) -> Result<(), DurabilityError> {
self.append_with(recorded_at, |envelope| Event::ChildWorkflowStarted {
envelope,
child_workflow_id,
workflow_type,
input,
package_version,
})
.await
}
pub async fn record_child_workflow_completed(
&mut self,
recorded_at: DateTime<Utc>,
child_workflow_id: WorkflowId,
result: Payload,
) -> Result<(), DurabilityError> {
self.append_with(recorded_at, |envelope| Event::ChildWorkflowCompleted {
envelope,
child_workflow_id,
result,
})
.await
}
pub async fn record_child_workflow_failed(
&mut self,
recorded_at: DateTime<Utc>,
child_workflow_id: WorkflowId,
error: WorkflowError,
) -> Result<(), DurabilityError> {
self.append_with(recorded_at, |envelope| Event::ChildWorkflowFailed {
envelope,
child_workflow_id,
error,
})
.await
}
fn next_envelope(&self, recorded_at: DateTime<Utc>) -> Result<EventEnvelope, DurabilityError> {
let seq = self
.sequence
.next_seq()
.ok_or_else(|| DurabilityError::HistoryShape {
reason: format!(
"sequence head overflow advancing {} by 1",
self.sequence.current()
),
})?;
Ok(EventEnvelope {
seq,
recorded_at,
workflow_id: self.workflow_id.clone(),
})
}
fn envelope_after(
&self,
previous: &EventEnvelope,
recorded_at: DateTime<Utc>,
) -> Result<EventEnvelope, DurabilityError> {
let seq = previous
.seq
.checked_add(1)
.ok_or_else(|| DurabilityError::HistoryShape {
reason: format!("sequence head overflow advancing {} by 1", previous.seq),
})?;
Ok(EventEnvelope {
seq,
recorded_at,
workflow_id: self.workflow_id.clone(),
})
}
async fn append_with(
&mut self,
recorded_at: DateTime<Utc>,
build_event: impl FnOnce(EventEnvelope) -> Event,
) -> Result<(), DurabilityError> {
let envelope = self.next_envelope(recorded_at)?;
self.append_one(build_event(envelope)).await
}
async fn append_one(&mut self, event: Event) -> Result<(), DurabilityError> {
let expected_seq = self.sequence.current();
self.store
.append(
self.write_token,
&self.workflow_id,
std::slice::from_ref(&event),
expected_seq,
)
.await?;
self.sequence.mark_append_success(1)
}
async fn upsert_visibility_projection(&self) -> Result<(), DurabilityError> {
let Some(visibility) = &self.visibility else {
return Ok(());
};
let history = self.store.read_history(&self.workflow_id).await?;
let record = visibility_record_from_history(&history, &visibility.run_id)?;
visibility.store.record_visibility(record).await?;
Ok(())
}
async fn upsert_visibility_projection_nonfatal(&self) {
if let Err(error) = self.upsert_visibility_projection().await {
let run_id = self
.visibility
.as_ref()
.map(|visibility| &visibility.run_id);
tracing::warn!(
workflow_id = %self.workflow_id,
run_id = run_id.map(ToString::to_string).as_deref().unwrap_or("unknown"),
error = %error,
"visibility upsert failed after durable append; crash-consistency window remains until reconciliation repairs visibility"
);
}
}
}
fn visibility_record_from_history(
history: &[Event],
run_id: &RunId,
) -> Result<VisibilityRecord, DurabilityError> {
let (workflow_id, workflow_type, start_time) = history
.iter()
.find_map(|event| match event {
Event::WorkflowStarted {
envelope,
workflow_type,
..
} => Some((
envelope.workflow_id.clone(),
workflow_type.clone(),
envelope.recorded_at,
)),
_ => None,
})
.ok_or_else(|| DurabilityError::HistoryShape {
reason: String::from(
"workflow history has no WorkflowStarted event for visibility projection",
),
})?;
Ok(VisibilityRecord {
workflow_id,
run_id: run_id.clone(),
workflow_type,
status: aion_core::status_from_events(history),
start_time,
close_time: terminal_recorded_at(history),
search_attributes: aion_core::search_attributes_from_events(history),
})
}
fn terminal_recorded_at(history: &[Event]) -> Option<DateTime<Utc>> {
history.iter().rev().find_map(|event| match event {
Event::WorkflowCompleted { envelope, .. }
| Event::WorkflowFailed { envelope, .. }
| Event::WorkflowCancelled { envelope, .. }
| Event::WorkflowTimedOut { envelope, .. }
| Event::WorkflowContinuedAsNew { envelope, .. } => Some(envelope.recorded_at),
_ => None,
})
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::io;
use std::sync::{Arc, Mutex};
use aion_core::RunId;
use aion_core::{
Event, Payload, SearchAttributeError, SearchAttributeSchema, SearchAttributeType,
SearchAttributeValue, TimerId,
};
use aion_store::visibility::{
ListWorkflowsFilter, VisibilityRecord, VisibilityStore, WorkflowSummary,
};
use aion_store::{
InMemoryStore, ReadableEventStore, StoreError, WritableEventStore, WriteToken,
};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde_json::json;
use tracing_subscriber::fmt::MakeWriter;
use super::Recorder;
use crate::durability::DurabilityError;
#[derive(Debug)]
struct FailingVisibilityStore;
#[async_trait]
impl VisibilityStore for FailingVisibilityStore {
async fn record_visibility(&self, _record: VisibilityRecord) -> Result<(), StoreError> {
Err(StoreError::Backend(String::from("visibility unavailable")))
}
async fn list_workflows(
&self,
_filter: ListWorkflowsFilter,
) -> Result<Vec<WorkflowSummary>, StoreError> {
Ok(Vec::new())
}
async fn count_workflows(&self, _filter: ListWorkflowsFilter) -> Result<u64, StoreError> {
Ok(0)
}
}
#[derive(Clone, Debug, Default)]
struct CapturedLogs {
buffer: Arc<Mutex<Vec<u8>>>,
}
impl CapturedLogs {
fn contents(&self) -> Result<String, Box<dyn std::error::Error>> {
let bytes = self
.buffer
.lock()
.map_err(|error| format!("captured log buffer lock poisoned: {error}"))?
.clone();
Ok(String::from_utf8(bytes)?)
}
}
struct CapturedLogWriter {
buffer: Arc<Mutex<Vec<u8>>>,
}
impl io::Write for CapturedLogWriter {
fn write(&mut self, bytes: &[u8]) -> io::Result<usize> {
self.buffer
.lock()
.map_err(|error| io::Error::other(format!("captured log lock poisoned: {error}")))?
.extend_from_slice(bytes);
Ok(bytes.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl<'writer> MakeWriter<'writer> for CapturedLogs {
type Writer = CapturedLogWriter;
fn make_writer(&'writer self) -> Self::Writer {
CapturedLogWriter {
buffer: Arc::clone(&self.buffer),
}
}
}
fn workflow_id(value: u128) -> aion_core::WorkflowId {
aion_core::WorkflowId::new(uuid::Uuid::from_u128(value))
}
fn recorded_at(offset_seconds: i64) -> DateTime<Utc> {
DateTime::from_timestamp(1_700_000_000 + offset_seconds, 0).unwrap_or_default()
}
fn payload(label: &str) -> Result<Payload, Box<dyn std::error::Error>> {
Ok(Payload::from_json(&json!({ "label": label }))?)
}
fn workflow_started(
seq: u64,
workflow_id: &aion_core::WorkflowId,
) -> Result<Event, Box<dyn std::error::Error>> {
Ok(Event::WorkflowStarted {
envelope: aion_core::EventEnvelope {
seq,
recorded_at: recorded_at(i64::try_from(seq)?),
workflow_id: workflow_id.clone(),
},
workflow_type: String::from("checkout"),
input: payload("workflow-input")?,
run_id: aion_core::RunId::new(uuid::Uuid::from_u128(1)),
parent_run_id: None,
package_version: aion_core::PackageVersion::new("a".repeat(64)),
})
}
#[tokio::test]
async fn recorder_advances_expected_sequence_between_appends()
-> Result<(), Box<dyn std::error::Error>> {
let workflow_id = workflow_id(1);
let store = Arc::new(InMemoryStore::default());
let mut recorder = Recorder::new(workflow_id.clone(), store.clone());
recorder
.record_workflow_started(
recorded_at(1),
super::WorkflowStartRecord {
workflow_type: String::from("checkout"),
input: payload("input")?,
run_id: aion_core::RunId::new(uuid::Uuid::from_u128(1)),
parent_run_id: None,
package_version: aion_core::PackageVersion::new("a".repeat(64)),
},
)
.await?;
recorder
.record_workflow_completed(recorded_at(2), payload("result")?)
.await?;
let history = store.read_history(&workflow_id).await?;
assert_eq!(history[0].seq(), 1);
assert_eq!(history[1].seq(), 2);
assert_eq!(recorder.current_head(), 2);
Ok(())
}
#[tokio::test]
async fn records_workflow_continued_as_new_terminal_event()
-> Result<(), Box<dyn std::error::Error>> {
let workflow_id = workflow_id(2);
let parent_run_id = aion_core::RunId::new(uuid::Uuid::from_u128(20));
let store = Arc::new(InMemoryStore::default());
let mut recorder = Recorder::new(workflow_id.clone(), store.clone());
let continued_at = recorded_at(2);
let continued_input = payload("continued-input")?;
let workflow_type = Some(String::from("checkout-v2"));
recorder
.record_workflow_started(
recorded_at(1),
super::WorkflowStartRecord {
workflow_type: String::from("checkout"),
input: payload("input")?,
run_id: aion_core::RunId::new(uuid::Uuid::from_u128(1)),
parent_run_id: None,
package_version: aion_core::PackageVersion::new("a".repeat(64)),
},
)
.await?;
recorder
.record_workflow_continued_as_new(
continued_at,
continued_input.clone(),
workflow_type.clone(),
parent_run_id.clone(),
)
.await?;
let history = store.read_history(&workflow_id).await?;
assert_eq!(history.len(), 2);
assert_eq!(history[0].seq(), 1);
assert_eq!(history[1].seq(), 2);
match &history[1] {
Event::WorkflowContinuedAsNew {
envelope,
input,
workflow_type: recorded_workflow_type,
parent_run_id: recorded_parent_run_id,
} => {
assert_eq!(envelope.recorded_at, continued_at);
assert_eq!(input, &continued_input);
assert_eq!(recorded_workflow_type, &workflow_type);
assert_eq!(recorded_parent_run_id, &parent_run_id);
}
other => return Err(format!("expected WorkflowContinuedAsNew, got {other:?}").into()),
}
assert_eq!(recorder.current_head(), 2);
Ok(())
}
#[tokio::test]
async fn records_validated_search_attributes_updated_event()
-> Result<(), Box<dyn std::error::Error>> {
let workflow_id = workflow_id(7);
let store = Arc::new(InMemoryStore::default());
let mut recorder = Recorder::new(workflow_id.clone(), store.clone());
let mut schema = SearchAttributeSchema::new();
schema.register("customer_id", SearchAttributeType::String)?;
schema.register("attempt", SearchAttributeType::Int)?;
let attributes = HashMap::from([
(
String::from("customer_id"),
SearchAttributeValue::String(String::from("customer-123")),
),
(String::from("attempt"), SearchAttributeValue::Int(2)),
]);
recorder
.record_workflow_started(
recorded_at(1),
super::WorkflowStartRecord {
workflow_type: String::from("checkout"),
input: payload("input")?,
run_id: aion_core::RunId::new(uuid::Uuid::from_u128(1)),
parent_run_id: None,
package_version: aion_core::PackageVersion::new("a".repeat(64)),
},
)
.await?;
recorder
.record_search_attributes_updated(recorded_at(2), attributes.clone(), &schema)
.await?;
let history = store.read_history(&workflow_id).await?;
match history.as_slice() {
[
Event::WorkflowStarted { .. },
Event::SearchAttributesUpdated {
envelope,
workflow_id: recorded_workflow_id,
attributes: stored_attributes,
},
] => {
assert_eq!(envelope.seq, 2);
assert_eq!(recorded_workflow_id, &workflow_id);
assert_eq!(stored_attributes, &attributes);
}
other => {
return Err(
format!("expected started then search attributes, found {other:?}").into(),
);
}
}
assert_eq!(recorder.current_head(), 2);
Ok(())
}
#[tokio::test]
async fn started_with_attributes_appends_both_events_in_one_atomic_batch()
-> Result<(), Box<dyn std::error::Error>> {
let workflow_id = workflow_id(11);
let run_id = aion_core::RunId::new(uuid::Uuid::from_u128(1));
let store = Arc::new(InMemoryStore::default());
let mut recorder = Recorder::new(workflow_id.clone(), store.clone());
let mut schema = SearchAttributeSchema::new();
schema.register("aion.namespace", SearchAttributeType::String)?;
let attributes = HashMap::from([(
String::from("aion.namespace"),
SearchAttributeValue::String(String::from("tenant-a")),
)]);
recorder
.record_workflow_started_with_attributes(
recorded_at(1),
super::WorkflowStartRecord {
workflow_type: String::from("checkout"),
input: payload("input")?,
run_id: run_id.clone(),
parent_run_id: None,
package_version: aion_core::PackageVersion::new("a".repeat(64)),
},
attributes.clone(),
&schema,
)
.await?;
let history = store.read_history(&workflow_id).await?;
match history.as_slice() {
[
Event::WorkflowStarted {
envelope: started_envelope,
run_id: started_run_id,
..
},
Event::SearchAttributesUpdated {
envelope: attributes_envelope,
workflow_id: recorded_workflow_id,
attributes: stored_attributes,
},
] => {
assert_eq!(started_envelope.seq, 1);
assert_eq!(started_run_id, &run_id);
assert_eq!(attributes_envelope.seq, 2);
assert_eq!(recorded_workflow_id, &workflow_id);
assert_eq!(stored_attributes, &attributes);
}
other => {
return Err(
format!("expected started then search attributes, found {other:?}").into(),
);
}
}
assert_eq!(recorder.current_head(), 2);
recorder
.record_workflow_completed(recorded_at(3), payload("result")?)
.await?;
let history = store.read_history(&workflow_id).await?;
assert_eq!(history.len(), 3);
assert_eq!(history[2].seq(), 3);
Ok(())
}
#[tokio::test]
async fn started_with_invalid_attributes_appends_nothing()
-> Result<(), Box<dyn std::error::Error>> {
let workflow_id = workflow_id(12);
let store = Arc::new(InMemoryStore::default());
let mut recorder = Recorder::new(workflow_id.clone(), store.clone());
let schema = SearchAttributeSchema::new();
let attributes = HashMap::from([(
String::from("aion.namespace"),
SearchAttributeValue::String(String::from("tenant-a")),
)]);
let result = recorder
.record_workflow_started_with_attributes(
recorded_at(1),
super::WorkflowStartRecord {
workflow_type: String::from("checkout"),
input: payload("input")?,
run_id: aion_core::RunId::new(uuid::Uuid::from_u128(1)),
parent_run_id: None,
package_version: aion_core::PackageVersion::new("a".repeat(64)),
},
attributes,
&schema,
)
.await;
assert!(matches!(
result,
Err(DurabilityError::SearchAttribute(
SearchAttributeError::UnregisteredAttribute { name }
)) if name == "aion.namespace"
));
assert!(store.read_history(&workflow_id).await?.is_empty());
assert_eq!(recorder.current_head(), 0);
Ok(())
}
#[tokio::test]
async fn started_with_empty_attributes_appends_only_the_start_event()
-> Result<(), Box<dyn std::error::Error>> {
let workflow_id = workflow_id(13);
let store = Arc::new(InMemoryStore::default());
let mut recorder = Recorder::new(workflow_id.clone(), store.clone());
let schema = SearchAttributeSchema::new();
recorder
.record_workflow_started_with_attributes(
recorded_at(1),
super::WorkflowStartRecord {
workflow_type: String::from("checkout"),
input: payload("input")?,
run_id: aion_core::RunId::new(uuid::Uuid::from_u128(1)),
parent_run_id: None,
package_version: aion_core::PackageVersion::new("a".repeat(64)),
},
HashMap::new(),
&schema,
)
.await?;
let history = store.read_history(&workflow_id).await?;
assert!(matches!(
history.as_slice(),
[Event::WorkflowStarted { .. }]
));
assert_eq!(recorder.current_head(), 1);
Ok(())
}
#[tokio::test]
async fn invalid_search_attributes_return_error_without_appending()
-> Result<(), Box<dyn std::error::Error>> {
let workflow_id = workflow_id(8);
let store = Arc::new(InMemoryStore::default());
let mut recorder = Recorder::new(workflow_id.clone(), store.clone());
let mut schema = SearchAttributeSchema::new();
schema.register("attempt", SearchAttributeType::Int)?;
recorder
.record_workflow_started(
recorded_at(1),
super::WorkflowStartRecord {
workflow_type: String::from("checkout"),
input: payload("input")?,
run_id: aion_core::RunId::new(uuid::Uuid::from_u128(1)),
parent_run_id: None,
package_version: aion_core::PackageVersion::new("a".repeat(64)),
},
)
.await?;
let attributes = HashMap::from([(
String::from("attempt"),
SearchAttributeValue::String(String::from("two")),
)]);
let error = recorder
.record_search_attributes_updated(recorded_at(2), attributes, &schema)
.await;
match error {
Err(DurabilityError::SearchAttribute(SearchAttributeError::TypeMismatch {
name,
expected,
actual,
})) => {
assert_eq!(name, "attempt");
assert_eq!(expected, SearchAttributeType::Int);
assert_eq!(actual, SearchAttributeType::String);
}
Err(other) => {
return Err(format!("expected search attribute error, got {other:?}").into());
}
Ok(()) => return Err("expected search attribute validation error".into()),
}
assert_eq!(recorder.current_head(), 1);
assert_eq!(store.read_history(&workflow_id).await?.len(), 1);
Ok(())
}
#[tokio::test]
async fn recorder_visibility_updates_after_search_attributes()
-> Result<(), Box<dyn std::error::Error>> {
let workflow_id = workflow_id(9);
let run_id = aion_core::RunId::new(uuid::Uuid::from_u128(90));
let store = Arc::new(InMemoryStore::default());
let mut recorder = Recorder::new(workflow_id.clone(), store.clone())
.with_visibility(run_id.clone(), store.clone());
let mut schema = SearchAttributeSchema::new();
schema.register("customer_id", SearchAttributeType::String)?;
let attributes = HashMap::from([(
String::from("customer_id"),
SearchAttributeValue::String(String::from("customer-123")),
)]);
recorder
.record_workflow_started(
recorded_at(1),
super::WorkflowStartRecord {
workflow_type: String::from("checkout"),
input: payload("input")?,
run_id: aion_core::RunId::new(uuid::Uuid::from_u128(1)),
parent_run_id: None,
package_version: aion_core::PackageVersion::new("a".repeat(64)),
},
)
.await?;
recorder
.record_search_attributes_updated(recorded_at(2), attributes.clone(), &schema)
.await?;
let summaries = store.list_workflows(ListWorkflowsFilter::default()).await?;
assert_eq!(summaries.len(), 1);
assert_eq!(summaries[0].workflow_id, workflow_id);
assert_eq!(summaries[0].run_id, run_id);
assert_eq!(summaries[0].search_attributes, attributes);
Ok(())
}
#[tokio::test]
async fn visibility_upsert_failure_after_append_logs_warning_and_succeeds()
-> Result<(), Box<dyn std::error::Error>> {
let workflow_id = workflow_id(10);
let run_id = RunId::new(uuid::Uuid::from_u128(100));
let store = Arc::new(InMemoryStore::default());
let visibility_store = Arc::new(FailingVisibilityStore);
let logs = CapturedLogs::default();
let subscriber = tracing_subscriber::fmt()
.with_max_level(tracing::Level::WARN)
.with_writer(logs.clone())
.without_time()
.finish();
let dispatch = tracing::Dispatch::new(subscriber);
let guard = tracing::dispatcher::set_default(&dispatch);
let mut recorder = Recorder::new(workflow_id.clone(), store.clone())
.with_visibility(run_id.clone(), visibility_store);
let mut schema = SearchAttributeSchema::new();
schema.register("customer_id", SearchAttributeType::String)?;
let attributes = HashMap::from([(
String::from("customer_id"),
SearchAttributeValue::String(String::from("customer-123")),
)]);
recorder
.record_workflow_started(
recorded_at(1),
super::WorkflowStartRecord {
workflow_type: String::from("checkout"),
input: payload("input")?,
run_id: run_id.clone(),
parent_run_id: None,
package_version: aion_core::PackageVersion::new("a".repeat(64)),
},
)
.await?;
recorder
.record_search_attributes_updated(recorded_at(2), attributes, &schema)
.await?;
drop(guard);
let history = store.read_history(&workflow_id).await?;
assert_eq!(history.len(), 2);
assert_eq!(recorder.current_head(), 2);
let output = logs.contents()?;
assert!(
output.contains("visibility upsert failed after durable append"),
"captured warning did not include visibility-upsert failure message: {output}"
);
assert!(
output.contains("crash-consistency window"),
"captured warning did not name the crash-consistency window: {output}"
);
assert!(output.contains(&workflow_id.to_string()));
assert!(output.contains(&run_id.to_string()));
assert!(output.contains("visibility unavailable"));
Ok(())
}
#[tokio::test]
async fn records_activity_events_in_sequence_order() -> Result<(), Box<dyn std::error::Error>> {
let workflow_id = workflow_id(6);
let store = Arc::new(InMemoryStore::default());
let mut recorder = Recorder::new(workflow_id.clone(), store.clone());
let activity_id = aion_core::ActivityId::from_sequence_position(1);
recorder
.record_activity_scheduled(
recorded_at(1),
activity_id.clone(),
String::from("charge-card"),
payload("input")?,
)
.await?;
recorder
.record_activity_completed(recorded_at(2), activity_id.clone(), payload("result")?)
.await?;
let history = store.read_history(&workflow_id).await?;
assert_eq!(history.len(), 2);
assert_eq!(history[0].seq(), 1);
assert_eq!(history[1].seq(), 2);
match &history[0] {
Event::ActivityScheduled {
activity_id: recorded_activity_id,
..
} => assert_eq!(recorded_activity_id, &activity_id),
other => return Err(format!("expected ActivityScheduled, got {other:?}").into()),
}
match &history[1] {
Event::ActivityCompleted {
activity_id: recorded_activity_id,
..
} => assert_eq!(recorded_activity_id, &activity_id),
other => return Err(format!("expected ActivityCompleted, got {other:?}").into()),
}
Ok(())
}
#[tokio::test]
async fn resume_at_continues_from_existing_history_head()
-> Result<(), Box<dyn std::error::Error>> {
let workflow_id = workflow_id(3);
let store = Arc::new(InMemoryStore::default());
let seeded = [
workflow_started(1, &workflow_id)?,
workflow_started(2, &workflow_id)?,
];
store
.append(WriteToken::recorder(), &workflow_id, &seeded, 0)
.await?;
let mut recorder = Recorder::resume_at(workflow_id.clone(), store.clone(), 2);
recorder
.record_signal_received(recorded_at(3), String::from("approve"), payload("signal")?)
.await?;
let history = store.read_history(&workflow_id).await?;
assert_eq!(history.len(), 3);
assert_eq!(history[2].seq(), 3);
assert_eq!(recorder.current_head(), 3);
Ok(())
}
#[tokio::test]
async fn sequence_conflict_surfaces_without_advancing_or_retrying()
-> Result<(), Box<dyn std::error::Error>> {
let workflow_id = workflow_id(4);
let store = Arc::new(InMemoryStore::default());
let mut recorder = Recorder::new(workflow_id.clone(), store.clone());
let rogue_event = workflow_started(1, &workflow_id)?;
store
.append(WriteToken::recorder(), &workflow_id, &[rogue_event], 0)
.await?;
let error = recorder
.record_timer_fired(recorded_at(2), TimerId::anonymous(2))
.await;
match error {
Err(DurabilityError::Store(StoreError::SequenceConflict { expected, found })) => {
assert_eq!(expected, 0);
assert_eq!(found, 1);
}
Err(other) => return Err(format!("expected sequence conflict, got {other:?}").into()),
Ok(()) => return Err("expected sequence conflict".into()),
}
assert_eq!(recorder.current_head(), 0);
let history = store.read_history(&workflow_id).await?;
assert_eq!(history.len(), 1);
assert_eq!(history[0].seq(), 1);
Ok(())
}
#[tokio::test]
async fn sequence_overflow_returns_error_without_appending()
-> Result<(), Box<dyn std::error::Error>> {
let workflow_id = workflow_id(5);
let store = Arc::new(InMemoryStore::default());
let mut recorder = Recorder::resume_at(workflow_id.clone(), store.clone(), u64::MAX);
let error = recorder
.record_workflow_completed(recorded_at(1), payload("result")?)
.await;
match error {
Err(DurabilityError::HistoryShape { reason }) => {
assert!(reason.contains("sequence head overflow"));
}
Err(other) => return Err(format!("expected sequence overflow, got {other:?}").into()),
Ok(()) => return Err("expected sequence overflow".into()),
}
assert_eq!(recorder.current_head(), u64::MAX);
assert!(store.read_history(&workflow_id).await?.is_empty());
Ok(())
}
}