use crate::{
event_store::EventStore,
ids::{ProcessId, StreamId, TenantId},
snapshot::{Snapshot, SnapshotStore},
version::WorkflowId,
workflow::Workflow,
};
#[derive(Debug, Clone)]
pub struct MigrationError {
pub stream_id: StreamId,
pub message: String,
}
impl std::fmt::Display for MigrationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"migration error on stream {}: {}",
self.stream_id, self.message
)
}
}
impl std::error::Error for MigrationError {}
pub trait StateMigration: Send + Sync + 'static {
type FromWorkflow: Workflow;
type ToWorkflow: Workflow;
fn source_workflow_id(&self) -> &WorkflowId;
fn target_workflow_id(&self) -> &WorkflowId;
fn migrate(
&self,
state: <Self::FromWorkflow as Workflow>::State,
) -> Result<<Self::ToWorkflow as Workflow>::State, String>;
}
#[derive(Debug, Default)]
pub struct MigrationReport {
pub migrated: usize,
pub skipped: usize,
pub errors: Vec<MigrationError>,
}
impl MigrationReport {
#[must_use]
pub fn is_ok(&self) -> bool {
self.errors.is_empty()
}
}
impl std::fmt::Display for MigrationReport {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"MigrationReport {{ migrated: {}, skipped: {}, errors: {} }}",
self.migrated,
self.skipped,
self.errors.len(),
)
}
}
fn parse_process_stream_id(stream_id: &str) -> Option<(TenantId, ProcessId)> {
let rest = stream_id.strip_prefix("process/")?;
let (tenant_str, process_str) = rest.split_once('/')?;
let tenant_uuid = uuid::Uuid::parse_str(tenant_str).ok()?;
let process_uuid = uuid::Uuid::parse_str(process_str).ok()?;
Some((
TenantId::from_uuid(tenant_uuid),
ProcessId::from_uuid(process_uuid),
))
}
pub struct MigrationRunner<M, ES, SS> {
migration: M,
event_store: ES,
snap_store: SS,
}
impl<M, ES, SS> MigrationRunner<M, ES, SS>
where
M: StateMigration,
<M::FromWorkflow as Workflow>::State: serde::de::DeserializeOwned,
<M::ToWorkflow as Workflow>::State: serde::Serialize,
ES: EventStore,
SS: SnapshotStore,
{
#[must_use]
pub fn new(migration: M, event_store: ES, snap_store: SS) -> Self {
Self {
migration,
event_store,
snap_store,
}
}
pub async fn run(&self) -> MigrationReport {
let streams = match self.event_store.list_streams(None).await {
Ok(s) => s,
Err(e) => {
return MigrationReport {
errors: vec![MigrationError {
stream_id: StreamId::new("(list_streams)"),
message: format!("list_streams failed: {e}"),
}],
..Default::default()
};
}
};
let mut report = MigrationReport::default();
for stream_id in streams {
match self.migrate_stream(&stream_id).await {
Ok(true) => report.migrated += 1,
Ok(false) => report.skipped += 1,
Err(err) => report.errors.push(err),
}
}
report
}
pub async fn run_and_update_registry<R>(&self, registry: &R) -> MigrationReport
where
R: crate::registry::ProcessRegistry,
{
let streams = match self.event_store.list_streams(None).await {
Ok(s) => s,
Err(e) => {
return MigrationReport {
errors: vec![MigrationError {
stream_id: StreamId::new("(list_streams)"),
message: format!("list_streams failed: {e}"),
}],
..Default::default()
};
}
};
let mut report = MigrationReport::default();
for stream_id in streams {
match self.migrate_stream(&stream_id).await {
Ok(false) => {
report.skipped += 1;
}
Ok(true) => {
report.migrated += 1;
if let Some((tenant_id, process_id)) =
parse_process_stream_id(stream_id.as_str())
{
let key = crate::registry::RegistryKey::from_process(process_id);
match registry.lookup(tenant_id, &key).await {
Ok(Some(mut identity)) => {
identity.workflow_id = self.migration.target_workflow_id().clone();
if let Err(e) = registry.register(tenant_id, &key, identity).await {
report.errors.push(MigrationError {
stream_id: stream_id.clone(),
message: format!(
"registry update failed for process {process_id}: {e}"
),
});
}
}
Ok(None) => {
}
Err(e) => {
report.errors.push(MigrationError {
stream_id: stream_id.clone(),
message: format!(
"registry lookup failed for process {process_id}: {e}"
),
});
}
}
} else {
tracing::warn!(
stream_id = stream_id.as_str(),
"run_and_update_registry: cannot parse tenant/process from \
stream_id — registry update skipped for this stream",
);
}
}
Err(err) => {
report.errors.push(err);
}
}
}
report
}
async fn migrate_stream(&self, stream_id: &StreamId) -> Result<bool, MigrationError> {
let events = self
.event_store
.load(stream_id)
.await
.map_err(|e| MigrationError {
stream_id: stream_id.clone(),
message: format!("event load failed: {e}"),
})?;
let Some(first) = events.first() else {
return Ok(false);
};
if &first.workflow_id != self.migration.source_workflow_id() {
return Ok(false);
}
let mut state = <M::FromWorkflow as Workflow>::State::default();
let last_seq = events.last().map_or(0, |e| e.sequence_number);
for env in events {
let payload = M::FromWorkflow::upcast(&env.event_type, env.schema_version, env.payload)
.map_err(|e| MigrationError {
stream_id: stream_id.clone(),
message: format!("upcast failed on seq {}: {e}", env.sequence_number),
})?;
let event: <M::FromWorkflow as Workflow>::Event = serde_json::from_value(payload)
.map_err(|e| MigrationError {
stream_id: stream_id.clone(),
message: format!("event deserialize failed: {e}"),
})?;
state = M::FromWorkflow::apply(state, &event);
}
let new_state = self
.migration
.migrate(state)
.map_err(|msg| MigrationError {
stream_id: stream_id.clone(),
message: msg,
})?;
let payload = serde_json::to_value(&new_state).map_err(|e| MigrationError {
stream_id: stream_id.clone(),
message: format!("state serialization failed: {e}"),
})?;
let snap = Snapshot::new(
stream_id.clone(),
last_seq,
M::ToWorkflow::state_schema_version(),
payload,
);
self.snap_store
.save(&snap)
.await
.map_err(|e| MigrationError {
stream_id: stream_id.clone(),
message: format!("snapshot save failed: {e}"),
})?;
Ok(true)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
envelope::NewEvent,
event_store::{ExpectedVersion, InMemoryEventStore},
ids::{ConversationId, CorrelationId, ProcessId, StreamId, TenantId},
snapshot::NoopSnapshotStore,
version::WorkflowId,
workflow::{CommandPayload, EventPayload, Workflow},
};
#[derive(Default, Clone, serde::Serialize, serde::Deserialize, PartialEq, Debug)]
struct CounterStateV1 {
count: u32,
}
#[derive(Clone, serde::Serialize, serde::Deserialize)]
enum CounterEventV1 {
Incremented,
}
impl EventPayload for CounterEventV1 {
fn event_type(&self) -> &'static str {
"Incremented"
}
}
#[derive(Clone)]
enum CounterCommandV1 {}
impl CommandPayload for CounterCommandV1 {}
struct CounterWorkflowV1;
impl Workflow for CounterWorkflowV1 {
type State = CounterStateV1;
type Event = CounterEventV1;
type Command = CounterCommandV1;
fn handle(
_state: &Self::State,
_cmd: Self::Command,
) -> Result<crate::workflow::WorkflowOutput<Self::Event>, crate::error::WorkflowError>
{
unreachable!("not used in migration tests")
}
fn apply(mut state: Self::State, event: &Self::Event) -> Self::State {
match event {
CounterEventV1::Incremented => state.count += 1,
}
state
}
}
#[derive(Default, Clone, serde::Serialize, serde::Deserialize, PartialEq, Debug)]
struct CounterStateV2 {
count: u32,
label: String,
}
#[derive(Clone, serde::Serialize, serde::Deserialize)]
enum CounterEventV2 {
Incremented,
}
impl EventPayload for CounterEventV2 {
fn event_type(&self) -> &'static str {
"Incremented"
}
}
#[derive(Clone)]
enum CounterCommandV2 {}
impl CommandPayload for CounterCommandV2 {}
struct CounterWorkflowV2;
impl Workflow for CounterWorkflowV2 {
type State = CounterStateV2;
type Event = CounterEventV2;
type Command = CounterCommandV2;
fn handle(
_state: &Self::State,
_cmd: Self::Command,
) -> Result<crate::workflow::WorkflowOutput<Self::Event>, crate::error::WorkflowError>
{
unreachable!("not used in migration tests")
}
fn apply(mut state: Self::State, event: &Self::Event) -> Self::State {
match event {
CounterEventV2::Incremented => state.count += 1,
}
state
}
fn state_schema_version() -> u32 {
2
}
}
struct V1ToV2;
impl StateMigration for V1ToV2 {
type FromWorkflow = CounterWorkflowV1;
type ToWorkflow = CounterWorkflowV2;
fn source_workflow_id(&self) -> &WorkflowId {
static WID: std::sync::OnceLock<WorkflowId> = std::sync::OnceLock::new();
WID.get_or_init(|| WorkflowId::new("counter", "FV2024-10-01"))
}
fn target_workflow_id(&self) -> &WorkflowId {
static WID: std::sync::OnceLock<WorkflowId> = std::sync::OnceLock::new();
WID.get_or_init(|| WorkflowId::new("counter", "FV2025-04-01"))
}
fn migrate(&self, state: CounterStateV1) -> Result<CounterStateV2, String> {
Ok(CounterStateV2 {
count: state.count,
label: "migrated".into(),
})
}
}
fn make_increment_event(workflow_id: WorkflowId) -> NewEvent {
let pid = ProcessId::new();
let tid = TenantId::new();
NewEvent {
correlation_id: CorrelationId::new(),
causation_id: None,
conversation_id: ConversationId::new(),
process_id: pid,
tenant_id: tid,
workflow_id,
event_type: "Incremented".into(),
schema_version: 1,
payload: serde_json::to_value(CounterEventV1::Incremented).unwrap(),
}
}
#[tokio::test]
async fn migrate_matching_stream() {
let store = InMemoryEventStore::default();
let snaps = crate::snapshot::InMemorySnapshotStore::new();
let sid = StreamId::new("process/counter-001");
let wid_v1 = WorkflowId::new("counter", "FV2024-10-01");
store
.append(
&sid,
ExpectedVersion::Any,
&[
make_increment_event(wid_v1.clone()),
make_increment_event(wid_v1.clone()),
make_increment_event(wid_v1.clone()),
],
)
.await
.unwrap();
let runner = MigrationRunner::new(V1ToV2, store, snaps.clone());
let report = runner.run().await;
assert!(report.is_ok(), "errors: {:?}", report.errors);
assert_eq!(report.migrated, 1);
assert_eq!(report.skipped, 0);
let snap = snaps.load(&sid).await.unwrap().expect("snapshot saved");
assert_eq!(snap.state_schema_version, 2);
let state: CounterStateV2 = serde_json::from_value(snap.state).unwrap();
assert_eq!(state.count, 3);
assert_eq!(state.label, "migrated");
}
#[tokio::test]
async fn skip_non_matching_stream() {
let store = InMemoryEventStore::default();
let snaps = NoopSnapshotStore;
let sid = StreamId::new("process/counter-other");
let wid_v2 = WorkflowId::new("counter", "FV2025-04-01");
store
.append(&sid, ExpectedVersion::Any, &[make_increment_event(wid_v2)])
.await
.unwrap();
let runner = MigrationRunner::new(V1ToV2, store, snaps);
let report = runner.run().await;
assert!(report.is_ok());
assert_eq!(report.migrated, 0);
assert_eq!(report.skipped, 1);
}
#[tokio::test]
async fn skip_empty_stream() {
let store = InMemoryEventStore::default();
let snaps = NoopSnapshotStore;
let runner = MigrationRunner::new(V1ToV2, store, snaps);
let report = runner.run().await;
assert!(report.is_ok());
assert_eq!(report.migrated, 0);
assert_eq!(report.skipped, 0);
}
#[tokio::test]
async fn migration_fn_error_is_recorded_not_fatal() {
struct FailingMigration;
impl StateMigration for FailingMigration {
type FromWorkflow = CounterWorkflowV1;
type ToWorkflow = CounterWorkflowV2;
fn source_workflow_id(&self) -> &WorkflowId {
static WID: std::sync::OnceLock<WorkflowId> = std::sync::OnceLock::new();
WID.get_or_init(|| WorkflowId::new("counter", "FV2024-10-01"))
}
fn target_workflow_id(&self) -> &WorkflowId {
static WID: std::sync::OnceLock<WorkflowId> = std::sync::OnceLock::new();
WID.get_or_init(|| WorkflowId::new("counter", "FV2025-04-01"))
}
fn migrate(&self, _state: CounterStateV1) -> Result<CounterStateV2, String> {
Err("intentional test failure".into())
}
}
let store = InMemoryEventStore::default();
let snaps = NoopSnapshotStore;
let sid = StreamId::new("process/failing");
let wid_v1 = WorkflowId::new("counter", "FV2024-10-01");
store
.append(&sid, ExpectedVersion::Any, &[make_increment_event(wid_v1)])
.await
.unwrap();
let runner = MigrationRunner::new(FailingMigration, store, snaps);
let report = runner.run().await;
assert!(!report.is_ok());
assert_eq!(report.errors.len(), 1);
assert_eq!(report.migrated, 0);
assert!(
report.errors[0]
.message
.contains("intentional test failure")
);
}
}