mako_engine/migration.rs
1//! In-flight process state migration across BDEW format-version boundaries.
2//!
3//! When an old BDEW format version (FV) is removed from the adapter registry
4//! after its grace period, any process initiated under that FV can no longer
5//! receive new events — the `ForwardCompatible` policy rejects FV mismatches
6//! at dispatch time. [`StateMigration`] + [`MigrationRunner`] provide the
7//! tooling to advance those processes to a newer FV *before* the old FV is
8//! retired.
9//!
10//! # How it works
11//!
12//! 1. **Scan** all event streams via [`EventStore::list_streams`].
13//! 2. **Filter** streams whose first event carries
14//! `workflow_id == migration.source_workflow_id()`.
15//! 3. **Replay** each matched stream using `FromWorkflow::apply` to reconstruct
16//! the fully-folded state.
17//! 4. **Migrate** the state via [`StateMigration::migrate`].
18//! 5. **Snapshot** the migrated state under `target_workflow_id`'s schema version so
19//! the process can continue executing under the new workflow definition without
20//! replaying old incompatible events.
21//!
22//! # Deployment sequence
23//!
24//! 1. Deploy the new binary with **both** FVs still registered in the adapter
25//! registry.
26//! 2. Run `MigrationRunner::run_and_update_registry(®istry)` — all in-flight
27//! processes are migrated and their routing-table entries are rewritten to use
28//! the new `workflow_id`. Inspect [`MigrationReport`] for errors.
29//! 3. Remove the old FV from the adapter registry and redeploy.
30//!
31//! > **Note**: `MigrationRunner::run_and_update_registry` handles the
32//! > `ProcessRegistry` update automatically (updating `ProcessIdentity.workflow_id`
33//! > for every primary process-keyed entry). For conversation- or correlation-keyed
34//! > entries (which are short-lived and self-expire) no action is needed.
35//! > If you only want the snapshot migration without registry updates, use the
36//! > simpler `MigrationRunner::run()` instead.
37//!
38//! # Example
39//!
40//! ```rust,ignore
41//! use mako_engine::{
42//! migration::{MigrationRunner, StateMigration},
43//! version::WorkflowId,
44//! };
45//!
46//! struct SupplierChangeFv2024ToFv2025;
47//!
48//! impl StateMigration for SupplierChangeFv2024ToFv2025 {
49//! type FromWorkflow = GpkeSupplierChangeWorkflowFv2024;
50//! type ToWorkflow = GpkeSupplierChangeWorkflowFv2025;
51//!
52//! fn source_workflow_id(&self) -> &WorkflowId { &FV2024_WORKFLOW_ID }
53//! fn target_workflow_id(&self) -> &WorkflowId { &FV2025_WORKFLOW_ID }
54//!
55//! fn migrate(
56//! &self,
57//! state: SupplierChangeStateFv2024,
58//! ) -> Result<SupplierChangeStateFv2025, String> {
59//! Ok(SupplierChangeStateFv2025::from_v2024(state))
60//! }
61//! }
62//!
63//! let runner = MigrationRunner::new(
64//! SupplierChangeFv2024ToFv2025,
65//! event_store,
66//! snap_store,
67//! );
68//! let report = runner.run().await;
69//! assert!(report.is_ok(), "migration errors: {:?}", report.errors);
70//! ```
71//!
72//! [`ProcessRegistry`]: crate::registry::ProcessRegistry
73//! [`ProcessIdentity`]: crate::ids::ProcessIdentity
74//! [`EventStore::list_streams`]: crate::event_store::EventStore::list_streams
75
76use crate::{
77 event_store::EventStore,
78 ids::{ProcessId, StreamId, TenantId},
79 snapshot::{Snapshot, SnapshotStore},
80 version::WorkflowId,
81 workflow::Workflow,
82};
83
84// ── MigrationError ────────────────────────────────────────────────────────────
85
86/// Describes a failure to migrate a single process stream.
87#[derive(Debug, Clone)]
88pub struct MigrationError {
89 /// The stream that could not be migrated.
90 pub stream_id: StreamId,
91 /// Human-readable failure reason.
92 pub message: String,
93}
94
95impl std::fmt::Display for MigrationError {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 write!(
98 f,
99 "migration error on stream {}: {}",
100 self.stream_id, self.message
101 )
102 }
103}
104
105impl std::error::Error for MigrationError {}
106
107// ── StateMigration ────────────────────────────────────────────────────────────
108
109/// A typed, one-directional migration from one workflow version to another.
110///
111/// Implement this trait for each FV-to-FV transition your deployment requires.
112/// The [`migrate`] function is called once per in-flight process stream and must
113/// be **pure** (no I/O, no clock access, no global state).
114///
115/// ## Additive-only changes (same state type)
116///
117/// When both FVs share the same `State` type (only new optional fields added),
118/// the migration can be a no-op:
119///
120/// ```rust,ignore
121/// fn migrate(&self, state: SharedState) -> Result<SharedState, String> {
122/// Ok(state)
123/// }
124/// ```
125///
126/// ## Structural changes (different state types)
127///
128/// When the state layout changed incompatibly (renamed variant, removed field,
129/// changed discriminant), use different `FromWorkflow::State` and
130/// `ToWorkflow::State` types. The Rust compiler will enforce that every field
131/// is explicitly mapped:
132///
133/// ```rust,ignore
134/// fn migrate(&self, old: OldState) -> Result<NewState, String> {
135/// match old {
136/// OldState::Initiated(data) => Ok(NewState::Initiated(NewInitiatedData {
137/// ems_id: data.ems_id,
138/// malo_id: data.malo_id,
139/// initiated: data.initiated_at, // renamed field
140/// })),
141/// OldState::Completed => Ok(NewState::Completed),
142/// }
143/// }
144/// ```
145///
146/// [`migrate`]: StateMigration::migrate
147pub trait StateMigration: Send + Sync + 'static {
148 /// The old workflow definition whose events are stored in matched streams.
149 type FromWorkflow: Workflow;
150 /// The new workflow definition that continues execution after migration.
151 type ToWorkflow: Workflow;
152
153 /// The `WorkflowId` (name + old BDEW FV) that identifies streams to migrate.
154 fn source_workflow_id(&self) -> &WorkflowId;
155
156 /// The `WorkflowId` (name + new BDEW FV) stamped into the migrated snapshot.
157 fn target_workflow_id(&self) -> &WorkflowId;
158
159 /// Map the fully-replayed old state to the new state type.
160 ///
161 /// Called once per matched stream. Must be **pure**.
162 ///
163 /// # Errors
164 ///
165 /// Return `Err(human_readable_reason)` when the state cannot be migrated.
166 /// The runner records the failure in [`MigrationReport::errors`] and
167 /// continues with remaining streams — a single bad stream does not abort
168 /// the entire migration.
169 fn migrate(
170 &self,
171 state: <Self::FromWorkflow as Workflow>::State,
172 ) -> Result<<Self::ToWorkflow as Workflow>::State, String>;
173}
174
175// ── MigrationReport ───────────────────────────────────────────────────────────
176
177/// Summary produced by a [`MigrationRunner::run`] call.
178#[derive(Debug, Default)]
179pub struct MigrationReport {
180 /// Number of streams successfully migrated and snapshotted.
181 pub migrated: usize,
182 /// Number of streams skipped (wrong `workflow_id`, empty, or already migrated).
183 pub skipped: usize,
184 /// Streams that encountered an error during replay, migration, or snapshot.
185 pub errors: Vec<MigrationError>,
186}
187
188impl MigrationReport {
189 /// Return `true` when no migration errors occurred.
190 ///
191 /// A `true` result does not imply that all processes were migrated — only
192 /// that all matched processes succeeded. Check [`migrated`] and [`skipped`]
193 /// to verify expected migration counts.
194 ///
195 /// [`migrated`]: MigrationReport::migrated
196 /// [`skipped`]: MigrationReport::skipped
197 #[must_use]
198 pub fn is_ok(&self) -> bool {
199 self.errors.is_empty()
200 }
201
202 /// Merge another report into this one (accumulate counts and errors).
203 pub fn merge(&mut self, other: MigrationReport) {
204 self.migrated += other.migrated;
205 self.skipped += other.skipped;
206 self.errors.extend(other.errors);
207 }
208}
209
210impl std::fmt::Display for MigrationReport {
211 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
212 write!(
213 f,
214 "MigrationReport {{ migrated: {}, skipped: {}, errors: {} }}",
215 self.migrated,
216 self.skipped,
217 self.errors.len(),
218 )
219 }
220}
221
222// ── IdentityMigration ─────────────────────────────────────────────────────────
223
224/// A no-op [`StateMigration`] for workflows whose state schema did **not** change
225/// between two BDEW format versions.
226///
227/// Use this when the FV transition only added new optional AHB rules, segment
228/// cardinality changes, or code-list entries — no field was renamed, removed,
229/// or made mandatory. The migrated snapshot repoints `workflow_id` to the new
230/// FV while keeping the state value identical.
231///
232/// # Example
233///
234/// ```rust,ignore
235/// use mako_engine::migration::IdentityMigration;
236/// use mako_engine::version::WorkflowId;
237/// use mako_gpke::lf_anmeldung::{GpkeLfAnmeldungWorkflow, WORKFLOW_NAME};
238///
239/// let migration = IdentityMigration::<GpkeLfAnmeldungWorkflow>::new(
240/// WorkflowId::new(WORKFLOW_NAME, "FV2025-10-01"),
241/// WorkflowId::new(WORKFLOW_NAME, "FV2026-10-01"),
242/// );
243/// ```
244pub struct IdentityMigration<W> {
245 source: WorkflowId,
246 target: WorkflowId,
247 _w: std::marker::PhantomData<fn() -> W>,
248}
249
250impl<W: Workflow> IdentityMigration<W> {
251 /// Construct a new identity migration between two format versions.
252 #[must_use]
253 pub fn new(source: WorkflowId, target: WorkflowId) -> Self {
254 Self {
255 source,
256 target,
257 _w: std::marker::PhantomData,
258 }
259 }
260}
261
262impl<W> StateMigration for IdentityMigration<W>
263where
264 W: Workflow + Send + Sync + 'static,
265 W::State: Clone,
266{
267 type FromWorkflow = W;
268 type ToWorkflow = W;
269
270 fn source_workflow_id(&self) -> &WorkflowId {
271 &self.source
272 }
273
274 fn target_workflow_id(&self) -> &WorkflowId {
275 &self.target
276 }
277
278 fn migrate(&self, state: W::State) -> Result<W::State, String> {
279 Ok(state)
280 }
281}
282
283// ── helpers ────────────────────────────────────────────────────────────────────
284
285/// Parse `(tenant_id, process_id)` from a process stream identifier.
286///
287/// Stream IDs for process streams follow the format
288/// `process/{tenant_uuid}/{process_uuid}` (includes a tenant discriminator
289/// for single-tenant isolation). Returns `None` when the format does not match.
290fn parse_process_stream_id(stream_id: &str) -> Option<(TenantId, ProcessId)> {
291 let rest = stream_id.strip_prefix("process/")?;
292 let (tenant_str, process_str) = rest.split_once('/')?;
293 let tenant_uuid = uuid::Uuid::parse_str(tenant_str).ok()?;
294 let process_uuid = uuid::Uuid::parse_str(process_str).ok()?;
295 Some((
296 TenantId::from_uuid(tenant_uuid),
297 ProcessId::from_uuid(process_uuid),
298 ))
299}
300
301// ── MigrationRunner ───────────────────────────────────────────────────────────
302
303/// Drives a [`StateMigration`] over every event stream in a store.
304///
305/// Constructed with separate [`EventStore`] and [`SnapshotStore`] handles so
306/// the runner can operate against any backend combination (e.g. `SlateDbStore`
307/// for events and `SlateDbSnapshotStore` for snapshots, or in-memory stores
308/// during testing).
309///
310/// # Concurrency
311///
312/// `run()` processes streams **sequentially**. For deployments with thousands
313/// of in-flight processes, wrapping the call in a dedicated migration task and
314/// using a custom prefix filter via [`EventStore::list_streams`] (e.g. filtering
315/// by stream-id prefix) can reduce the scan scope.
316pub struct MigrationRunner<M, ES, SS> {
317 migration: M,
318 event_store: ES,
319 snap_store: SS,
320}
321
322impl<M, ES, SS> MigrationRunner<M, ES, SS>
323where
324 M: StateMigration,
325 <M::FromWorkflow as Workflow>::State: serde::de::DeserializeOwned,
326 <M::ToWorkflow as Workflow>::State: serde::Serialize,
327 ES: EventStore,
328 SS: SnapshotStore,
329{
330 /// Construct a new runner.
331 #[must_use]
332 pub fn new(migration: M, event_store: ES, snap_store: SS) -> Self {
333 Self {
334 migration,
335 event_store,
336 snap_store,
337 }
338 }
339
340 /// Scan all event streams and migrate those that match `source_workflow_id`.
341 ///
342 /// - Streams with no events or a different `workflow_id` are counted in
343 /// [`MigrationReport::skipped`].
344 /// - Streams that fail (replay error, `migrate()` returning `Err`, or
345 /// snapshot write failure) are recorded in [`MigrationReport::errors`]
346 /// and do **not** abort the run.
347 ///
348 /// If `list_streams` itself fails, a single error entry covering
349 /// `"(list_streams)"` is returned immediately.
350 pub async fn run(&self) -> MigrationReport {
351 let streams = match self.event_store.list_streams(None).await {
352 Ok(s) => s,
353 Err(e) => {
354 return MigrationReport {
355 errors: vec![MigrationError {
356 stream_id: StreamId::new("(list_streams)"),
357 message: format!("list_streams failed: {e}"),
358 }],
359 ..Default::default()
360 };
361 }
362 };
363
364 let mut report = MigrationReport::default();
365
366 for stream_id in streams {
367 match self.migrate_stream(&stream_id).await {
368 Ok(true) => report.migrated += 1,
369 Ok(false) => report.skipped += 1,
370 Err(err) => report.errors.push(err),
371 }
372 }
373
374 report
375 }
376
377 /// Like [`run`] but also updates [`ProcessRegistry`] entries after each
378 /// successful migration.
379 ///
380 /// For every migrated stream the runner:
381 ///
382 /// 1. Parses `(tenant_id, process_id)` from the stream ID
383 /// (`process/{tenant_id}/{process_id}`).
384 /// 2. Looks up `RegistryKey::from_process(process_id)` for that tenant.
385 /// 3. Rewrites the stored [`ProcessIdentity`] with the new `workflow_id`
386 /// and updated `stream_id` (which embeds the tenant discriminator).
387 ///
388 /// Entries for conversation- or correlation-based routing keys are
389 /// typically short-lived (they exist only during an active EDIFACT
390 /// exchange) and do not need updating here.
391 ///
392 /// Registry update failures are recorded as warnings in the returned
393 /// [`MigrationReport`] but do **not** roll back the snapshot that was
394 /// already written.
395 ///
396 /// [`run`]: MigrationRunner::run
397 /// [`ProcessRegistry`]: crate::registry::ProcessRegistry
398 /// [`ProcessIdentity`]: crate::ids::ProcessIdentity
399 pub async fn run_and_update_registry<R>(&self, registry: &R) -> MigrationReport
400 where
401 R: crate::registry::ProcessRegistry,
402 {
403 let streams = match self.event_store.list_streams(None).await {
404 Ok(s) => s,
405 Err(e) => {
406 return MigrationReport {
407 errors: vec![MigrationError {
408 stream_id: StreamId::new("(list_streams)"),
409 message: format!("list_streams failed: {e}"),
410 }],
411 ..Default::default()
412 };
413 }
414 };
415
416 let mut report = MigrationReport::default();
417
418 for stream_id in streams {
419 match self.migrate_stream(&stream_id).await {
420 Ok(false) => {
421 report.skipped += 1;
422 }
423 Ok(true) => {
424 report.migrated += 1;
425 // Best-effort registry update: parse tenant + process from
426 // the stream ID and rewrite the primary process-keyed entry.
427 if let Some((tenant_id, process_id)) =
428 parse_process_stream_id(stream_id.as_str())
429 {
430 let key = crate::registry::RegistryKey::from_process(process_id);
431 match registry.lookup(tenant_id, &key).await {
432 Ok(Some(mut identity)) => {
433 // Rebind workflow_id to the new version; the
434 // stream_id is already correct (unchanged by
435 // migration).
436 identity.workflow_id = self.migration.target_workflow_id().clone();
437 if let Err(e) = registry.register(tenant_id, &key, identity).await {
438 report.errors.push(MigrationError {
439 stream_id: stream_id.clone(),
440 message: format!(
441 "registry update failed for process {process_id}: {e}"
442 ),
443 });
444 }
445 }
446 Ok(None) => {
447 // No direct-process registry entry — process
448 // might only be accessible via conversation/
449 // correlation keys; nothing to do here.
450 }
451 Err(e) => {
452 report.errors.push(MigrationError {
453 stream_id: stream_id.clone(),
454 message: format!(
455 "registry lookup failed for process {process_id}: {e}"
456 ),
457 });
458 }
459 }
460 } else {
461 tracing::warn!(
462 stream_id = stream_id.as_str(),
463 "run_and_update_registry: cannot parse tenant/process from \
464 stream_id — registry update skipped for this stream",
465 );
466 }
467 }
468 Err(err) => {
469 report.errors.push(err);
470 }
471 }
472 }
473
474 report
475 }
476
477 /// Attempt to migrate a single stream.
478 ///
479 /// Returns `Ok(true)` when the stream was migrated, `Ok(false)` when it
480 /// was skipped.
481 async fn migrate_stream(&self, stream_id: &StreamId) -> Result<bool, MigrationError> {
482 // Load all events. We need them for both the workflow_id check (peek
483 // the first event) and the fold. A cursor-based alternative would
484 // require two round-trips; load_all is acceptable for a one-time
485 // migration operation.
486 let events = self
487 .event_store
488 .load(stream_id)
489 .await
490 .map_err(|e| MigrationError {
491 stream_id: stream_id.clone(),
492 message: format!("event load failed: {e}"),
493 })?;
494
495 let Some(first) = events.first() else {
496 // Empty stream — nothing to migrate.
497 return Ok(false);
498 };
499
500 if &first.workflow_id != self.migration.source_workflow_id() {
501 // Different workflow — skip.
502 return Ok(false);
503 }
504
505 // Fold state using FromWorkflow.
506 let mut state = <M::FromWorkflow as Workflow>::State::default();
507 let last_seq = events.last().map_or(0, |e| e.sequence_number);
508
509 for env in events {
510 let payload = M::FromWorkflow::upcast(&env.event_type, env.schema_version, env.payload)
511 .map_err(|e| MigrationError {
512 stream_id: stream_id.clone(),
513 message: format!("upcast failed on seq {}: {e}", env.sequence_number),
514 })?;
515 let event: <M::FromWorkflow as Workflow>::Event = serde_json::from_value(payload)
516 .map_err(|e| MigrationError {
517 stream_id: stream_id.clone(),
518 message: format!("event deserialize failed: {e}"),
519 })?;
520 state = M::FromWorkflow::apply(state, &event);
521 }
522
523 // Apply the user-supplied migration function.
524 let new_state = self
525 .migration
526 .migrate(state)
527 .map_err(|msg| MigrationError {
528 stream_id: stream_id.clone(),
529 message: msg,
530 })?;
531
532 // Serialize the migrated state.
533 let payload = serde_json::to_value(&new_state).map_err(|e| MigrationError {
534 stream_id: stream_id.clone(),
535 message: format!("state serialization failed: {e}"),
536 })?;
537
538 // Save a snapshot under the new workflow's schema version. Future
539 // Process::<ToWorkflow>::state_with_snapshot() calls will load this
540 // snapshot and skip the old (incompatible) events entirely.
541 let snap = Snapshot::new(
542 stream_id.clone(),
543 last_seq,
544 M::ToWorkflow::state_schema_version(),
545 payload,
546 );
547 self.snap_store
548 .save(&snap)
549 .await
550 .map_err(|e| MigrationError {
551 stream_id: stream_id.clone(),
552 message: format!("snapshot save failed: {e}"),
553 })?;
554
555 Ok(true)
556 }
557}
558
559// ── Tests ─────────────────────────────────────────────────────────────────────
560
561#[cfg(test)]
562mod tests {
563 use super::*;
564 use crate::{
565 envelope::NewEvent,
566 event_store::{ExpectedVersion, InMemoryEventStore},
567 ids::{ConversationId, CorrelationId, ProcessId, StreamId, TenantId},
568 snapshot::NoopSnapshotStore,
569 version::WorkflowId,
570 workflow::{CommandPayload, EventPayload, Workflow},
571 };
572
573 // ── Minimal "counter" workflows for test purposes ─────────────────────────
574
575 #[derive(Default, Clone, serde::Serialize, serde::Deserialize, PartialEq, Debug)]
576 struct CounterStateV1 {
577 count: u32,
578 }
579
580 #[derive(Clone, serde::Serialize, serde::Deserialize)]
581 enum CounterEventV1 {
582 Incremented,
583 }
584
585 impl EventPayload for CounterEventV1 {
586 fn event_type(&self) -> &'static str {
587 "Incremented"
588 }
589 }
590
591 #[derive(Clone)]
592 enum CounterCommandV1 {}
593
594 impl CommandPayload for CounterCommandV1 {}
595
596 struct CounterWorkflowV1;
597 impl Workflow for CounterWorkflowV1 {
598 type State = CounterStateV1;
599 type Event = CounterEventV1;
600 type Command = CounterCommandV1;
601
602 fn handle(
603 _state: &Self::State,
604 _cmd: Self::Command,
605 ) -> Result<crate::workflow::WorkflowOutput<Self::Event>, crate::error::WorkflowError>
606 {
607 unreachable!("not used in migration tests")
608 }
609
610 fn apply(mut state: Self::State, event: &Self::Event) -> Self::State {
611 match event {
612 CounterEventV1::Incremented => state.count += 1,
613 }
614 state
615 }
616 }
617
618 // V2 adds a `label` field.
619 #[derive(Default, Clone, serde::Serialize, serde::Deserialize, PartialEq, Debug)]
620 struct CounterStateV2 {
621 count: u32,
622 label: String,
623 }
624
625 #[derive(Clone, serde::Serialize, serde::Deserialize)]
626 enum CounterEventV2 {
627 Incremented,
628 }
629
630 impl EventPayload for CounterEventV2 {
631 fn event_type(&self) -> &'static str {
632 "Incremented"
633 }
634 }
635
636 #[derive(Clone)]
637 enum CounterCommandV2 {}
638
639 impl CommandPayload for CounterCommandV2 {}
640
641 struct CounterWorkflowV2;
642 impl Workflow for CounterWorkflowV2 {
643 type State = CounterStateV2;
644 type Event = CounterEventV2;
645 type Command = CounterCommandV2;
646
647 fn handle(
648 _state: &Self::State,
649 _cmd: Self::Command,
650 ) -> Result<crate::workflow::WorkflowOutput<Self::Event>, crate::error::WorkflowError>
651 {
652 unreachable!("not used in migration tests")
653 }
654
655 fn apply(mut state: Self::State, event: &Self::Event) -> Self::State {
656 match event {
657 CounterEventV2::Incremented => state.count += 1,
658 }
659 state
660 }
661
662 fn state_schema_version() -> u32 {
663 2
664 }
665 }
666
667 // ── Migration implementation ──────────────────────────────────────────────
668
669 struct V1ToV2;
670
671 impl StateMigration for V1ToV2 {
672 type FromWorkflow = CounterWorkflowV1;
673 type ToWorkflow = CounterWorkflowV2;
674
675 fn source_workflow_id(&self) -> &WorkflowId {
676 static WID: std::sync::OnceLock<WorkflowId> = std::sync::OnceLock::new();
677 WID.get_or_init(|| WorkflowId::new("counter", "FV2024-10-01"))
678 }
679
680 fn target_workflow_id(&self) -> &WorkflowId {
681 static WID: std::sync::OnceLock<WorkflowId> = std::sync::OnceLock::new();
682 WID.get_or_init(|| WorkflowId::new("counter", "FV2025-04-01"))
683 }
684
685 fn migrate(&self, state: CounterStateV1) -> Result<CounterStateV2, String> {
686 Ok(CounterStateV2 {
687 count: state.count,
688 label: "migrated".into(),
689 })
690 }
691 }
692
693 // ── Helpers ───────────────────────────────────────────────────────────────
694
695 fn make_increment_event(workflow_id: WorkflowId) -> NewEvent {
696 let pid = ProcessId::new();
697 let tid = TenantId::new();
698 NewEvent {
699 correlation_id: CorrelationId::new(),
700 causation_id: None,
701 conversation_id: ConversationId::new(),
702 process_id: pid,
703 tenant_id: tid,
704 workflow_id,
705 event_type: "Incremented".into(),
706 schema_version: 1,
707 payload: serde_json::to_value(CounterEventV1::Incremented).unwrap(),
708 }
709 }
710
711 // ── Tests ─────────────────────────────────────────────────────────────────
712
713 #[tokio::test]
714 async fn migrate_matching_stream() {
715 let store = InMemoryEventStore::default();
716 let snaps = crate::snapshot::InMemorySnapshotStore::new();
717 let sid = StreamId::new("process/counter-001");
718 let wid_v1 = WorkflowId::new("counter", "FV2024-10-01");
719
720 // Append 3 increment events under the V1 workflow.
721 store
722 .append(
723 &sid,
724 ExpectedVersion::Any,
725 &[
726 make_increment_event(wid_v1.clone()),
727 make_increment_event(wid_v1.clone()),
728 make_increment_event(wid_v1.clone()),
729 ],
730 )
731 .await
732 .unwrap();
733
734 let runner = MigrationRunner::new(V1ToV2, store, snaps.clone());
735 let report = runner.run().await;
736
737 assert!(report.is_ok(), "errors: {:?}", report.errors);
738 assert_eq!(report.migrated, 1);
739 assert_eq!(report.skipped, 0);
740
741 // The snapshot should encode the migrated V2 state.
742 let snap = snaps.load(&sid).await.unwrap().expect("snapshot saved");
743 assert_eq!(snap.state_schema_version, 2);
744 let state: CounterStateV2 = serde_json::from_value(snap.state).unwrap();
745 assert_eq!(state.count, 3);
746 assert_eq!(state.label, "migrated");
747 }
748
749 #[tokio::test]
750 async fn skip_non_matching_stream() {
751 let store = InMemoryEventStore::default();
752 let snaps = NoopSnapshotStore;
753 let sid = StreamId::new("process/counter-other");
754 let wid_v2 = WorkflowId::new("counter", "FV2025-04-01"); // already V2
755
756 store
757 .append(&sid, ExpectedVersion::Any, &[make_increment_event(wid_v2)])
758 .await
759 .unwrap();
760
761 let runner = MigrationRunner::new(V1ToV2, store, snaps);
762 let report = runner.run().await;
763
764 assert!(report.is_ok());
765 assert_eq!(report.migrated, 0);
766 assert_eq!(report.skipped, 1);
767 }
768
769 #[tokio::test]
770 async fn skip_empty_stream() {
771 let store = InMemoryEventStore::default();
772 let snaps = NoopSnapshotStore;
773 // list_streams returns an empty list for an unused store.
774 let runner = MigrationRunner::new(V1ToV2, store, snaps);
775 let report = runner.run().await;
776
777 assert!(report.is_ok());
778 assert_eq!(report.migrated, 0);
779 assert_eq!(report.skipped, 0);
780 }
781
782 #[tokio::test]
783 async fn migration_fn_error_is_recorded_not_fatal() {
784 struct FailingMigration;
785
786 impl StateMigration for FailingMigration {
787 type FromWorkflow = CounterWorkflowV1;
788 type ToWorkflow = CounterWorkflowV2;
789 fn source_workflow_id(&self) -> &WorkflowId {
790 static WID: std::sync::OnceLock<WorkflowId> = std::sync::OnceLock::new();
791 WID.get_or_init(|| WorkflowId::new("counter", "FV2024-10-01"))
792 }
793
794 fn target_workflow_id(&self) -> &WorkflowId {
795 static WID: std::sync::OnceLock<WorkflowId> = std::sync::OnceLock::new();
796 WID.get_or_init(|| WorkflowId::new("counter", "FV2025-04-01"))
797 }
798
799 fn migrate(&self, _state: CounterStateV1) -> Result<CounterStateV2, String> {
800 Err("intentional test failure".into())
801 }
802 }
803
804 let store = InMemoryEventStore::default();
805 let snaps = NoopSnapshotStore;
806 let sid = StreamId::new("process/failing");
807 let wid_v1 = WorkflowId::new("counter", "FV2024-10-01");
808
809 store
810 .append(&sid, ExpectedVersion::Any, &[make_increment_event(wid_v1)])
811 .await
812 .unwrap();
813
814 let runner = MigrationRunner::new(FailingMigration, store, snaps);
815 let report = runner.run().await;
816
817 assert!(!report.is_ok());
818 assert_eq!(report.errors.len(), 1);
819 assert_eq!(report.migrated, 0);
820 assert!(
821 report.errors[0]
822 .message
823 .contains("intentional test failure")
824 );
825 }
826}