Skip to main content

mako_engine/
migration.rs

1//! In-flight process state migration across BDEW format-version boundaries.
2//!
3//! When an old BDEW format version (FV) is removed from the adapter registry
4//! after its grace period, any process initiated under that FV can no longer
5//! receive new events — the `ForwardCompatible` policy rejects FV mismatches
6//! at dispatch time. [`StateMigration`] + [`MigrationRunner`] provide the
7//! tooling to advance those processes to a newer FV *before* the old FV is
8//! retired.
9//!
10//! # How it works
11//!
12//! 1. **Scan** all event streams via [`EventStore::list_streams`].
13//! 2. **Filter** streams whose first event carries
14//!    `workflow_id == migration.source_workflow_id()`.
15//! 3. **Replay** each matched stream using `FromWorkflow::apply` to reconstruct
16//!    the fully-folded state.
17//! 4. **Migrate** the state via [`StateMigration::migrate`].
18//! 5. **Snapshot** the migrated state under `target_workflow_id`'s schema version so
19//!    the process can continue executing under the new workflow definition without
20//!    replaying old incompatible events.
21//!
22//! # Deployment sequence
23//!
24//! 1. Deploy the new binary with **both** FVs still registered in the adapter
25//!    registry.
26//! 2. Run `MigrationRunner::run_and_update_registry(&registry)` — all in-flight
27//!    processes are migrated and their routing-table entries are rewritten to use
28//!    the new `workflow_id`. Inspect [`MigrationReport`] for errors.
29//! 3. Remove the old FV from the adapter registry and redeploy.
30//!
31//! > **Note**: `MigrationRunner::run_and_update_registry` handles the
32//! > `ProcessRegistry` update automatically (updating `ProcessIdentity.workflow_id`
33//! > for every primary process-keyed entry). For conversation- or correlation-keyed
34//! > entries (which are short-lived and self-expire) no action is needed.
35//! > If you only want the snapshot migration without registry updates, use the
36//! > simpler `MigrationRunner::run()` instead.
37//!
38//! # Example
39//!
40//! ```rust,ignore
41//! use mako_engine::{
42//!     migration::{MigrationRunner, StateMigration},
43//!     version::WorkflowId,
44//! };
45//!
46//! struct SupplierChangeFv2024ToFv2025;
47//!
48//! impl StateMigration for SupplierChangeFv2024ToFv2025 {
49//!     type FromWorkflow = GpkeSupplierChangeWorkflowFv2024;
50//!     type ToWorkflow   = GpkeSupplierChangeWorkflowFv2025;
51//!
52//!     fn source_workflow_id(&self) -> &WorkflowId { &FV2024_WORKFLOW_ID }
53//!     fn target_workflow_id(&self)   -> &WorkflowId { &FV2025_WORKFLOW_ID }
54//!
55//!     fn migrate(
56//!         &self,
57//!         state: SupplierChangeStateFv2024,
58//!     ) -> Result<SupplierChangeStateFv2025, String> {
59//!         Ok(SupplierChangeStateFv2025::from_v2024(state))
60//!     }
61//! }
62//!
63//! let runner = MigrationRunner::new(
64//!     SupplierChangeFv2024ToFv2025,
65//!     event_store,
66//!     snap_store,
67//! );
68//! let report = runner.run().await;
69//! assert!(report.is_ok(), "migration errors: {:?}", report.errors);
70//! ```
71//!
72//! [`ProcessRegistry`]: crate::registry::ProcessRegistry
73//! [`ProcessIdentity`]: crate::ids::ProcessIdentity
74//! [`EventStore::list_streams`]: crate::event_store::EventStore::list_streams
75
76use crate::{
77    event_store::EventStore,
78    ids::{ProcessId, StreamId, TenantId},
79    snapshot::{Snapshot, SnapshotStore},
80    version::WorkflowId,
81    workflow::Workflow,
82};
83
84// ── MigrationError ────────────────────────────────────────────────────────────
85
86/// Describes a failure to migrate a single process stream.
87#[derive(Debug, Clone)]
88pub struct MigrationError {
89    /// The stream that could not be migrated.
90    pub stream_id: StreamId,
91    /// Human-readable failure reason.
92    pub message: String,
93}
94
95impl std::fmt::Display for MigrationError {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        write!(
98            f,
99            "migration error on stream {}: {}",
100            self.stream_id, self.message
101        )
102    }
103}
104
105impl std::error::Error for MigrationError {}
106
107// ── StateMigration ────────────────────────────────────────────────────────────
108
109/// A typed, one-directional migration from one workflow version to another.
110///
111/// Implement this trait for each FV-to-FV transition your deployment requires.
112/// The [`migrate`] function is called once per in-flight process stream and must
113/// be **pure** (no I/O, no clock access, no global state).
114///
115/// ## Additive-only changes (same state type)
116///
117/// When both FVs share the same `State` type (only new optional fields added),
118/// the migration can be a no-op:
119///
120/// ```rust,ignore
121/// fn migrate(&self, state: SharedState) -> Result<SharedState, String> {
122///     Ok(state)
123/// }
124/// ```
125///
126/// ## Structural changes (different state types)
127///
128/// When the state layout changed incompatibly (renamed variant, removed field,
129/// changed discriminant), use different `FromWorkflow::State` and
130/// `ToWorkflow::State` types. The Rust compiler will enforce that every field
131/// is explicitly mapped:
132///
133/// ```rust,ignore
134/// fn migrate(&self, old: OldState) -> Result<NewState, String> {
135///     match old {
136///         OldState::Initiated(data) => Ok(NewState::Initiated(NewInitiatedData {
137///             ems_id:    data.ems_id,
138///             malo_id:   data.malo_id,
139///             initiated: data.initiated_at,  // renamed field
140///         })),
141///         OldState::Completed => Ok(NewState::Completed),
142///     }
143/// }
144/// ```
145///
146/// [`migrate`]: StateMigration::migrate
147pub trait StateMigration: Send + Sync + 'static {
148    /// The old workflow definition whose events are stored in matched streams.
149    type FromWorkflow: Workflow;
150    /// The new workflow definition that continues execution after migration.
151    type ToWorkflow: Workflow;
152
153    /// The `WorkflowId` (name + old BDEW FV) that identifies streams to migrate.
154    fn source_workflow_id(&self) -> &WorkflowId;
155
156    /// The `WorkflowId` (name + new BDEW FV) stamped into the migrated snapshot.
157    fn target_workflow_id(&self) -> &WorkflowId;
158
159    /// Map the fully-replayed old state to the new state type.
160    ///
161    /// Called once per matched stream. Must be **pure**.
162    ///
163    /// # Errors
164    ///
165    /// Return `Err(human_readable_reason)` when the state cannot be migrated.
166    /// The runner records the failure in [`MigrationReport::errors`] and
167    /// continues with remaining streams — a single bad stream does not abort
168    /// the entire migration.
169    fn migrate(
170        &self,
171        state: <Self::FromWorkflow as Workflow>::State,
172    ) -> Result<<Self::ToWorkflow as Workflow>::State, String>;
173}
174
175// ── MigrationReport ───────────────────────────────────────────────────────────
176
177/// Summary produced by a [`MigrationRunner::run`] call.
178#[derive(Debug, Default)]
179pub struct MigrationReport {
180    /// Number of streams successfully migrated and snapshotted.
181    pub migrated: usize,
182    /// Number of streams skipped (wrong `workflow_id`, empty, or already migrated).
183    pub skipped: usize,
184    /// Streams that encountered an error during replay, migration, or snapshot.
185    pub errors: Vec<MigrationError>,
186}
187
188impl MigrationReport {
189    /// Return `true` when no migration errors occurred.
190    ///
191    /// A `true` result does not imply that all processes were migrated — only
192    /// that all matched processes succeeded. Check [`migrated`] and [`skipped`]
193    /// to verify expected migration counts.
194    ///
195    /// [`migrated`]: MigrationReport::migrated
196    /// [`skipped`]: MigrationReport::skipped
197    #[must_use]
198    pub fn is_ok(&self) -> bool {
199        self.errors.is_empty()
200    }
201}
202
203impl std::fmt::Display for MigrationReport {
204    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205        write!(
206            f,
207            "MigrationReport {{ migrated: {}, skipped: {}, errors: {} }}",
208            self.migrated,
209            self.skipped,
210            self.errors.len(),
211        )
212    }
213}
214
215// ── helpers ────────────────────────────────────────────────────────────────────
216
217/// Parse `(tenant_id, process_id)` from a process stream identifier.
218///
219/// Stream IDs for process streams follow the format
220/// `process/{tenant_uuid}/{process_uuid}` (includes a tenant discriminator
221/// for single-tenant isolation).  Returns `None` when the format does not match.
222fn parse_process_stream_id(stream_id: &str) -> Option<(TenantId, ProcessId)> {
223    let rest = stream_id.strip_prefix("process/")?;
224    let (tenant_str, process_str) = rest.split_once('/')?;
225    let tenant_uuid = uuid::Uuid::parse_str(tenant_str).ok()?;
226    let process_uuid = uuid::Uuid::parse_str(process_str).ok()?;
227    Some((
228        TenantId::from_uuid(tenant_uuid),
229        ProcessId::from_uuid(process_uuid),
230    ))
231}
232
233// ── MigrationRunner ───────────────────────────────────────────────────────────
234
235/// Drives a [`StateMigration`] over every event stream in a store.
236///
237/// Constructed with separate [`EventStore`] and [`SnapshotStore`] handles so
238/// the runner can operate against any backend combination (e.g. `SlateDbStore`
239/// for events and `SlateDbSnapshotStore` for snapshots, or in-memory stores
240/// during testing).
241///
242/// # Concurrency
243///
244/// `run()` processes streams **sequentially**. For deployments with thousands
245/// of in-flight processes, wrapping the call in a dedicated migration task and
246/// using a custom prefix filter via [`EventStore::list_streams`] (e.g. filtering
247/// by stream-id prefix) can reduce the scan scope.
248pub struct MigrationRunner<M, ES, SS> {
249    migration: M,
250    event_store: ES,
251    snap_store: SS,
252}
253
254impl<M, ES, SS> MigrationRunner<M, ES, SS>
255where
256    M: StateMigration,
257    <M::FromWorkflow as Workflow>::State: serde::de::DeserializeOwned,
258    <M::ToWorkflow as Workflow>::State: serde::Serialize,
259    ES: EventStore,
260    SS: SnapshotStore,
261{
262    /// Construct a new runner.
263    #[must_use]
264    pub fn new(migration: M, event_store: ES, snap_store: SS) -> Self {
265        Self {
266            migration,
267            event_store,
268            snap_store,
269        }
270    }
271
272    /// Scan all event streams and migrate those that match `source_workflow_id`.
273    ///
274    /// - Streams with no events or a different `workflow_id` are counted in
275    ///   [`MigrationReport::skipped`].
276    /// - Streams that fail (replay error, `migrate()` returning `Err`, or
277    ///   snapshot write failure) are recorded in [`MigrationReport::errors`]
278    ///   and do **not** abort the run.
279    ///
280    /// If `list_streams` itself fails, a single error entry covering
281    /// `"(list_streams)"` is returned immediately.
282    pub async fn run(&self) -> MigrationReport {
283        let streams = match self.event_store.list_streams(None).await {
284            Ok(s) => s,
285            Err(e) => {
286                return MigrationReport {
287                    errors: vec![MigrationError {
288                        stream_id: StreamId::new("(list_streams)"),
289                        message: format!("list_streams failed: {e}"),
290                    }],
291                    ..Default::default()
292                };
293            }
294        };
295
296        let mut report = MigrationReport::default();
297
298        for stream_id in streams {
299            match self.migrate_stream(&stream_id).await {
300                Ok(true) => report.migrated += 1,
301                Ok(false) => report.skipped += 1,
302                Err(err) => report.errors.push(err),
303            }
304        }
305
306        report
307    }
308
309    /// Like [`run`] but also updates [`ProcessRegistry`] entries after each
310    /// successful migration.
311    ///
312    /// For every migrated stream the runner:
313    ///
314    /// 1. Parses `(tenant_id, process_id)` from the stream ID
315    ///    (`process/{tenant_id}/{process_id}`).
316    /// 2. Looks up `RegistryKey::from_process(process_id)` for that tenant.
317    /// 3. Rewrites the stored [`ProcessIdentity`] with the new `workflow_id`
318    ///    and updated `stream_id` (which embeds the tenant discriminator).
319    ///
320    /// Entries for conversation- or correlation-based routing keys are
321    /// typically short-lived (they exist only during an active EDIFACT
322    /// exchange) and do not need updating here.
323    ///
324    /// Registry update failures are recorded as warnings in the returned
325    /// [`MigrationReport`] but do **not** roll back the snapshot that was
326    /// already written.
327    ///
328    /// [`run`]: MigrationRunner::run
329    /// [`ProcessRegistry`]: crate::registry::ProcessRegistry
330    /// [`ProcessIdentity`]: crate::ids::ProcessIdentity
331    pub async fn run_and_update_registry<R>(&self, registry: &R) -> MigrationReport
332    where
333        R: crate::registry::ProcessRegistry,
334    {
335        let streams = match self.event_store.list_streams(None).await {
336            Ok(s) => s,
337            Err(e) => {
338                return MigrationReport {
339                    errors: vec![MigrationError {
340                        stream_id: StreamId::new("(list_streams)"),
341                        message: format!("list_streams failed: {e}"),
342                    }],
343                    ..Default::default()
344                };
345            }
346        };
347
348        let mut report = MigrationReport::default();
349
350        for stream_id in streams {
351            match self.migrate_stream(&stream_id).await {
352                Ok(false) => {
353                    report.skipped += 1;
354                }
355                Ok(true) => {
356                    report.migrated += 1;
357                    // Best-effort registry update: parse tenant + process from
358                    // the stream ID and rewrite the primary process-keyed entry.
359                    if let Some((tenant_id, process_id)) =
360                        parse_process_stream_id(stream_id.as_str())
361                    {
362                        let key = crate::registry::RegistryKey::from_process(process_id);
363                        match registry.lookup(tenant_id, &key).await {
364                            Ok(Some(mut identity)) => {
365                                // Rebind workflow_id to the new version; the
366                                // stream_id is already correct (unchanged by
367                                // migration).
368                                identity.workflow_id = self.migration.target_workflow_id().clone();
369                                if let Err(e) = registry.register(tenant_id, &key, identity).await {
370                                    report.errors.push(MigrationError {
371                                        stream_id: stream_id.clone(),
372                                        message: format!(
373                                            "registry update failed for process {process_id}: {e}"
374                                        ),
375                                    });
376                                }
377                            }
378                            Ok(None) => {
379                                // No direct-process registry entry — process
380                                // might only be accessible via conversation/
381                                // correlation keys; nothing to do here.
382                            }
383                            Err(e) => {
384                                report.errors.push(MigrationError {
385                                    stream_id: stream_id.clone(),
386                                    message: format!(
387                                        "registry lookup failed for process {process_id}: {e}"
388                                    ),
389                                });
390                            }
391                        }
392                    } else {
393                        tracing::warn!(
394                            stream_id = stream_id.as_str(),
395                            "run_and_update_registry: cannot parse tenant/process from \
396                             stream_id — registry update skipped for this stream",
397                        );
398                    }
399                }
400                Err(err) => {
401                    report.errors.push(err);
402                }
403            }
404        }
405
406        report
407    }
408
409    /// Attempt to migrate a single stream.
410    ///
411    /// Returns `Ok(true)` when the stream was migrated, `Ok(false)` when it
412    /// was skipped.
413    async fn migrate_stream(&self, stream_id: &StreamId) -> Result<bool, MigrationError> {
414        // Load all events. We need them for both the workflow_id check (peek
415        // the first event) and the fold. A cursor-based alternative would
416        // require two round-trips; load_all is acceptable for a one-time
417        // migration operation.
418        let events = self
419            .event_store
420            .load(stream_id)
421            .await
422            .map_err(|e| MigrationError {
423                stream_id: stream_id.clone(),
424                message: format!("event load failed: {e}"),
425            })?;
426
427        let Some(first) = events.first() else {
428            // Empty stream — nothing to migrate.
429            return Ok(false);
430        };
431
432        if &first.workflow_id != self.migration.source_workflow_id() {
433            // Different workflow — skip.
434            return Ok(false);
435        }
436
437        // Fold state using FromWorkflow.
438        let mut state = <M::FromWorkflow as Workflow>::State::default();
439        let last_seq = events.last().map_or(0, |e| e.sequence_number);
440
441        for env in events {
442            let payload = M::FromWorkflow::upcast(&env.event_type, env.schema_version, env.payload)
443                .map_err(|e| MigrationError {
444                    stream_id: stream_id.clone(),
445                    message: format!("upcast failed on seq {}: {e}", env.sequence_number),
446                })?;
447            let event: <M::FromWorkflow as Workflow>::Event = serde_json::from_value(payload)
448                .map_err(|e| MigrationError {
449                    stream_id: stream_id.clone(),
450                    message: format!("event deserialize failed: {e}"),
451                })?;
452            state = M::FromWorkflow::apply(state, &event);
453        }
454
455        // Apply the user-supplied migration function.
456        let new_state = self
457            .migration
458            .migrate(state)
459            .map_err(|msg| MigrationError {
460                stream_id: stream_id.clone(),
461                message: msg,
462            })?;
463
464        // Serialize the migrated state.
465        let payload = serde_json::to_value(&new_state).map_err(|e| MigrationError {
466            stream_id: stream_id.clone(),
467            message: format!("state serialization failed: {e}"),
468        })?;
469
470        // Save a snapshot under the new workflow's schema version. Future
471        // Process::<ToWorkflow>::state_with_snapshot() calls will load this
472        // snapshot and skip the old (incompatible) events entirely.
473        let snap = Snapshot::new(
474            stream_id.clone(),
475            last_seq,
476            M::ToWorkflow::state_schema_version(),
477            payload,
478        );
479        self.snap_store
480            .save(&snap)
481            .await
482            .map_err(|e| MigrationError {
483                stream_id: stream_id.clone(),
484                message: format!("snapshot save failed: {e}"),
485            })?;
486
487        Ok(true)
488    }
489}
490
491// ── Tests ─────────────────────────────────────────────────────────────────────
492
493#[cfg(test)]
494mod tests {
495    use super::*;
496    use crate::{
497        envelope::NewEvent,
498        event_store::{ExpectedVersion, InMemoryEventStore},
499        ids::{ConversationId, CorrelationId, ProcessId, StreamId, TenantId},
500        snapshot::NoopSnapshotStore,
501        version::WorkflowId,
502        workflow::{CommandPayload, EventPayload, Workflow},
503    };
504
505    // ── Minimal "counter" workflows for test purposes ─────────────────────────
506
507    #[derive(Default, Clone, serde::Serialize, serde::Deserialize, PartialEq, Debug)]
508    struct CounterStateV1 {
509        count: u32,
510    }
511
512    #[derive(Clone, serde::Serialize, serde::Deserialize)]
513    enum CounterEventV1 {
514        Incremented,
515    }
516
517    impl EventPayload for CounterEventV1 {
518        fn event_type(&self) -> &'static str {
519            "Incremented"
520        }
521    }
522
523    #[derive(Clone)]
524    enum CounterCommandV1 {}
525
526    impl CommandPayload for CounterCommandV1 {}
527
528    struct CounterWorkflowV1;
529    impl Workflow for CounterWorkflowV1 {
530        type State = CounterStateV1;
531        type Event = CounterEventV1;
532        type Command = CounterCommandV1;
533
534        fn handle(
535            _state: &Self::State,
536            _cmd: Self::Command,
537        ) -> Result<crate::workflow::WorkflowOutput<Self::Event>, crate::error::WorkflowError>
538        {
539            unreachable!("not used in migration tests")
540        }
541
542        fn apply(mut state: Self::State, event: &Self::Event) -> Self::State {
543            match event {
544                CounterEventV1::Incremented => state.count += 1,
545            }
546            state
547        }
548    }
549
550    // V2 adds a `label` field.
551    #[derive(Default, Clone, serde::Serialize, serde::Deserialize, PartialEq, Debug)]
552    struct CounterStateV2 {
553        count: u32,
554        label: String,
555    }
556
557    #[derive(Clone, serde::Serialize, serde::Deserialize)]
558    enum CounterEventV2 {
559        Incremented,
560    }
561
562    impl EventPayload for CounterEventV2 {
563        fn event_type(&self) -> &'static str {
564            "Incremented"
565        }
566    }
567
568    #[derive(Clone)]
569    enum CounterCommandV2 {}
570
571    impl CommandPayload for CounterCommandV2 {}
572
573    struct CounterWorkflowV2;
574    impl Workflow for CounterWorkflowV2 {
575        type State = CounterStateV2;
576        type Event = CounterEventV2;
577        type Command = CounterCommandV2;
578
579        fn handle(
580            _state: &Self::State,
581            _cmd: Self::Command,
582        ) -> Result<crate::workflow::WorkflowOutput<Self::Event>, crate::error::WorkflowError>
583        {
584            unreachable!("not used in migration tests")
585        }
586
587        fn apply(mut state: Self::State, event: &Self::Event) -> Self::State {
588            match event {
589                CounterEventV2::Incremented => state.count += 1,
590            }
591            state
592        }
593
594        fn state_schema_version() -> u32 {
595            2
596        }
597    }
598
599    // ── Migration implementation ──────────────────────────────────────────────
600
601    struct V1ToV2;
602
603    impl StateMigration for V1ToV2 {
604        type FromWorkflow = CounterWorkflowV1;
605        type ToWorkflow = CounterWorkflowV2;
606
607        fn source_workflow_id(&self) -> &WorkflowId {
608            static WID: std::sync::OnceLock<WorkflowId> = std::sync::OnceLock::new();
609            WID.get_or_init(|| WorkflowId::new("counter", "FV2024-10-01"))
610        }
611
612        fn target_workflow_id(&self) -> &WorkflowId {
613            static WID: std::sync::OnceLock<WorkflowId> = std::sync::OnceLock::new();
614            WID.get_or_init(|| WorkflowId::new("counter", "FV2025-04-01"))
615        }
616
617        fn migrate(&self, state: CounterStateV1) -> Result<CounterStateV2, String> {
618            Ok(CounterStateV2 {
619                count: state.count,
620                label: "migrated".into(),
621            })
622        }
623    }
624
625    // ── Helpers ───────────────────────────────────────────────────────────────
626
627    fn make_increment_event(workflow_id: WorkflowId) -> NewEvent {
628        let pid = ProcessId::new();
629        let tid = TenantId::new();
630        NewEvent {
631            correlation_id: CorrelationId::new(),
632            causation_id: None,
633            conversation_id: ConversationId::new(),
634            process_id: pid,
635            tenant_id: tid,
636            workflow_id,
637            event_type: "Incremented".into(),
638            schema_version: 1,
639            payload: serde_json::to_value(CounterEventV1::Incremented).unwrap(),
640        }
641    }
642
643    // ── Tests ─────────────────────────────────────────────────────────────────
644
645    #[tokio::test]
646    async fn migrate_matching_stream() {
647        let store = InMemoryEventStore::default();
648        let snaps = crate::snapshot::InMemorySnapshotStore::new();
649        let sid = StreamId::new("process/counter-001");
650        let wid_v1 = WorkflowId::new("counter", "FV2024-10-01");
651
652        // Append 3 increment events under the V1 workflow.
653        store
654            .append(
655                &sid,
656                ExpectedVersion::Any,
657                &[
658                    make_increment_event(wid_v1.clone()),
659                    make_increment_event(wid_v1.clone()),
660                    make_increment_event(wid_v1.clone()),
661                ],
662            )
663            .await
664            .unwrap();
665
666        let runner = MigrationRunner::new(V1ToV2, store, snaps.clone());
667        let report = runner.run().await;
668
669        assert!(report.is_ok(), "errors: {:?}", report.errors);
670        assert_eq!(report.migrated, 1);
671        assert_eq!(report.skipped, 0);
672
673        // The snapshot should encode the migrated V2 state.
674        let snap = snaps.load(&sid).await.unwrap().expect("snapshot saved");
675        assert_eq!(snap.state_schema_version, 2);
676        let state: CounterStateV2 = serde_json::from_value(snap.state).unwrap();
677        assert_eq!(state.count, 3);
678        assert_eq!(state.label, "migrated");
679    }
680
681    #[tokio::test]
682    async fn skip_non_matching_stream() {
683        let store = InMemoryEventStore::default();
684        let snaps = NoopSnapshotStore;
685        let sid = StreamId::new("process/counter-other");
686        let wid_v2 = WorkflowId::new("counter", "FV2025-04-01"); // already V2
687
688        store
689            .append(&sid, ExpectedVersion::Any, &[make_increment_event(wid_v2)])
690            .await
691            .unwrap();
692
693        let runner = MigrationRunner::new(V1ToV2, store, snaps);
694        let report = runner.run().await;
695
696        assert!(report.is_ok());
697        assert_eq!(report.migrated, 0);
698        assert_eq!(report.skipped, 1);
699    }
700
701    #[tokio::test]
702    async fn skip_empty_stream() {
703        let store = InMemoryEventStore::default();
704        let snaps = NoopSnapshotStore;
705        // list_streams returns an empty list for an unused store.
706        let runner = MigrationRunner::new(V1ToV2, store, snaps);
707        let report = runner.run().await;
708
709        assert!(report.is_ok());
710        assert_eq!(report.migrated, 0);
711        assert_eq!(report.skipped, 0);
712    }
713
714    #[tokio::test]
715    async fn migration_fn_error_is_recorded_not_fatal() {
716        struct FailingMigration;
717
718        impl StateMigration for FailingMigration {
719            type FromWorkflow = CounterWorkflowV1;
720            type ToWorkflow = CounterWorkflowV2;
721            fn source_workflow_id(&self) -> &WorkflowId {
722                static WID: std::sync::OnceLock<WorkflowId> = std::sync::OnceLock::new();
723                WID.get_or_init(|| WorkflowId::new("counter", "FV2024-10-01"))
724            }
725
726            fn target_workflow_id(&self) -> &WorkflowId {
727                static WID: std::sync::OnceLock<WorkflowId> = std::sync::OnceLock::new();
728                WID.get_or_init(|| WorkflowId::new("counter", "FV2025-04-01"))
729            }
730
731            fn migrate(&self, _state: CounterStateV1) -> Result<CounterStateV2, String> {
732                Err("intentional test failure".into())
733            }
734        }
735
736        let store = InMemoryEventStore::default();
737        let snaps = NoopSnapshotStore;
738        let sid = StreamId::new("process/failing");
739        let wid_v1 = WorkflowId::new("counter", "FV2024-10-01");
740
741        store
742            .append(&sid, ExpectedVersion::Any, &[make_increment_event(wid_v1)])
743            .await
744            .unwrap();
745
746        let runner = MigrationRunner::new(FailingMigration, store, snaps);
747        let report = runner.run().await;
748
749        assert!(!report.is_ok());
750        assert_eq!(report.errors.len(), 1);
751        assert_eq!(report.migrated, 0);
752        assert!(
753            report.errors[0]
754                .message
755                .contains("intentional test failure")
756        );
757    }
758}