Skip to main content

mako_engine/
migration.rs

1//! In-flight process state migration across BDEW format-version boundaries.
2//!
3//! When an old BDEW format version (FV) is removed from the adapter registry
4//! after its grace period, any process initiated under that FV can no longer
5//! receive new events — the `ForwardCompatible` policy rejects FV mismatches
6//! at dispatch time. [`StateMigration`] + [`MigrationRunner`] provide the
7//! tooling to advance those processes to a newer FV *before* the old FV is
8//! retired.
9//!
10//! # How it works
11//!
12//! 1. **Scan** all event streams via [`EventStore::list_streams`].
13//! 2. **Filter** streams whose first event carries
14//!    `workflow_id == migration.source_workflow_id()`.
15//! 3. **Replay** each matched stream using `FromWorkflow::apply` to reconstruct
16//!    the fully-folded state.
17//! 4. **Migrate** the state via [`StateMigration::migrate`].
18//! 5. **Snapshot** the migrated state under `target_workflow_id`'s schema version so
19//!    the process can continue executing under the new workflow definition without
20//!    replaying old incompatible events.
21//!
22//! # Deployment sequence
23//!
24//! 1. Deploy the new binary with **both** FVs still registered in the adapter
25//!    registry.
26//! 2. Run `MigrationRunner::run_and_update_registry(&registry)` — all in-flight
27//!    processes are migrated and their routing-table entries are rewritten to use
28//!    the new `workflow_id`. Inspect [`MigrationReport`] for errors.
29//! 3. Remove the old FV from the adapter registry and redeploy.
30//!
31//! > **Note**: `MigrationRunner::run_and_update_registry` handles the
32//! > `ProcessRegistry` update automatically (updating `ProcessIdentity.workflow_id`
33//! > for every primary process-keyed entry). For conversation- or correlation-keyed
34//! > entries (which are short-lived and self-expire) no action is needed.
35//! > If you only want the snapshot migration without registry updates, use the
36//! > simpler `MigrationRunner::run()` instead.
37//!
38//! # Example
39//!
40//! ```rust,ignore
41//! use mako_engine::{
42//!     migration::{MigrationRunner, StateMigration},
43//!     version::WorkflowId,
44//! };
45//!
46//! struct SupplierChangeFv2024ToFv2025;
47//!
48//! impl StateMigration for SupplierChangeFv2024ToFv2025 {
49//!     type FromWorkflow = GpkeSupplierChangeWorkflowFv2024;
50//!     type ToWorkflow   = GpkeSupplierChangeWorkflowFv2025;
51//!
52//!     fn source_workflow_id(&self) -> &WorkflowId { &FV2024_WORKFLOW_ID }
53//!     fn target_workflow_id(&self)   -> &WorkflowId { &FV2025_WORKFLOW_ID }
54//!
55//!     fn migrate(
56//!         &self,
57//!         state: SupplierChangeStateFv2024,
58//!     ) -> Result<SupplierChangeStateFv2025, String> {
59//!         Ok(SupplierChangeStateFv2025::from_v2024(state))
60//!     }
61//! }
62//!
63//! let runner = MigrationRunner::new(
64//!     SupplierChangeFv2024ToFv2025,
65//!     event_store,
66//!     snap_store,
67//! );
68//! let report = runner.run().await;
69//! assert!(report.is_ok(), "migration errors: {:?}", report.errors);
70//! ```
71//!
72//! [`ProcessRegistry`]: crate::registry::ProcessRegistry
73//! [`ProcessIdentity`]: crate::ids::ProcessIdentity
74//! [`EventStore::list_streams`]: crate::event_store::EventStore::list_streams
75
76use crate::{
77    event_store::EventStore,
78    ids::{ProcessId, StreamId, TenantId},
79    snapshot::{Snapshot, SnapshotStore},
80    version::WorkflowId,
81    workflow::Workflow,
82};
83
84// ── MigrationError ────────────────────────────────────────────────────────────
85
86/// Describes a failure to migrate a single process stream.
87#[derive(Debug, Clone)]
88pub struct MigrationError {
89    /// The stream that could not be migrated.
90    pub stream_id: StreamId,
91    /// Human-readable failure reason.
92    pub message: String,
93}
94
95impl std::fmt::Display for MigrationError {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        write!(
98            f,
99            "migration error on stream {}: {}",
100            self.stream_id, self.message
101        )
102    }
103}
104
105impl std::error::Error for MigrationError {}
106
107// ── StateMigration ────────────────────────────────────────────────────────────
108
109/// A typed, one-directional migration from one workflow version to another.
110///
111/// Implement this trait for each FV-to-FV transition your deployment requires.
112/// The [`migrate`] function is called once per in-flight process stream and must
113/// be **pure** (no I/O, no clock access, no global state).
114///
115/// ## Additive-only changes (same state type)
116///
117/// When both FVs share the same `State` type (only new optional fields added),
118/// the migration can be a no-op:
119///
120/// ```rust,ignore
121/// fn migrate(&self, state: SharedState) -> Result<SharedState, String> {
122///     Ok(state)
123/// }
124/// ```
125///
126/// ## Structural changes (different state types)
127///
128/// When the state layout changed incompatibly (renamed variant, removed field,
129/// changed discriminant), use different `FromWorkflow::State` and
130/// `ToWorkflow::State` types. The Rust compiler will enforce that every field
131/// is explicitly mapped:
132///
133/// ```rust,ignore
134/// fn migrate(&self, old: OldState) -> Result<NewState, String> {
135///     match old {
136///         OldState::Initiated(data) => Ok(NewState::Initiated(NewInitiatedData {
137///             ems_id:    data.ems_id,
138///             malo_id:   data.malo_id,
139///             initiated: data.initiated_at,  // renamed field
140///         })),
141///         OldState::Completed => Ok(NewState::Completed),
142///     }
143/// }
144/// ```
145///
146/// [`migrate`]: StateMigration::migrate
147pub trait StateMigration: Send + Sync + 'static {
148    /// The old workflow definition whose events are stored in matched streams.
149    type FromWorkflow: Workflow;
150    /// The new workflow definition that continues execution after migration.
151    type ToWorkflow: Workflow;
152
153    /// The `WorkflowId` (name + old BDEW FV) that identifies streams to migrate.
154    fn source_workflow_id(&self) -> &WorkflowId;
155
156    /// The `WorkflowId` (name + new BDEW FV) stamped into the migrated snapshot.
157    fn target_workflow_id(&self) -> &WorkflowId;
158
159    /// Map the fully-replayed old state to the new state type.
160    ///
161    /// Called once per matched stream. Must be **pure**.
162    ///
163    /// # Errors
164    ///
165    /// Return `Err(human_readable_reason)` when the state cannot be migrated.
166    /// The runner records the failure in [`MigrationReport::errors`] and
167    /// continues with remaining streams — a single bad stream does not abort
168    /// the entire migration.
169    fn migrate(
170        &self,
171        state: <Self::FromWorkflow as Workflow>::State,
172    ) -> Result<<Self::ToWorkflow as Workflow>::State, String>;
173}
174
175// ── MigrationReport ───────────────────────────────────────────────────────────
176
177/// Summary produced by a [`MigrationRunner::run`] call.
178#[derive(Debug, Default)]
179pub struct MigrationReport {
180    /// Number of streams successfully migrated and snapshotted.
181    pub migrated: usize,
182    /// Number of streams skipped (wrong `workflow_id`, empty, or already migrated).
183    pub skipped: usize,
184    /// Streams that encountered an error during replay, migration, or snapshot.
185    pub errors: Vec<MigrationError>,
186}
187
188impl MigrationReport {
189    /// Return `true` when no migration errors occurred.
190    ///
191    /// A `true` result does not imply that all processes were migrated — only
192    /// that all matched processes succeeded. Check [`migrated`] and [`skipped`]
193    /// to verify expected migration counts.
194    ///
195    /// [`migrated`]: MigrationReport::migrated
196    /// [`skipped`]: MigrationReport::skipped
197    #[must_use]
198    pub fn is_ok(&self) -> bool {
199        self.errors.is_empty()
200    }
201
202    /// Merge another report into this one (accumulate counts and errors).
203    pub fn merge(&mut self, other: MigrationReport) {
204        self.migrated += other.migrated;
205        self.skipped += other.skipped;
206        self.errors.extend(other.errors);
207    }
208}
209
210impl std::fmt::Display for MigrationReport {
211    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
212        write!(
213            f,
214            "MigrationReport {{ migrated: {}, skipped: {}, errors: {} }}",
215            self.migrated,
216            self.skipped,
217            self.errors.len(),
218        )
219    }
220}
221
222// ── IdentityMigration ─────────────────────────────────────────────────────────
223
224/// A no-op [`StateMigration`] for workflows whose state schema did **not** change
225/// between two BDEW format versions.
226///
227/// Use this when the FV transition only added new optional AHB rules, segment
228/// cardinality changes, or code-list entries — no field was renamed, removed,
229/// or made mandatory. The migrated snapshot repoints `workflow_id` to the new
230/// FV while keeping the state value identical.
231///
232/// # Example
233///
234/// ```rust,ignore
235/// use mako_engine::migration::IdentityMigration;
236/// use mako_engine::version::WorkflowId;
237/// use mako_gpke::lf_anmeldung::{GpkeLfAnmeldungWorkflow, WORKFLOW_NAME};
238///
239/// let migration = IdentityMigration::<GpkeLfAnmeldungWorkflow>::new(
240///     WorkflowId::new(WORKFLOW_NAME, "FV2025-10-01"),
241///     WorkflowId::new(WORKFLOW_NAME, "FV2026-10-01"),
242/// );
243/// ```
244pub struct IdentityMigration<W> {
245    source: WorkflowId,
246    target: WorkflowId,
247    _w: std::marker::PhantomData<fn() -> W>,
248}
249
250impl<W: Workflow> IdentityMigration<W> {
251    /// Construct a new identity migration between two format versions.
252    #[must_use]
253    pub fn new(source: WorkflowId, target: WorkflowId) -> Self {
254        Self {
255            source,
256            target,
257            _w: std::marker::PhantomData,
258        }
259    }
260}
261
262impl<W> StateMigration for IdentityMigration<W>
263where
264    W: Workflow + Send + Sync + 'static,
265    W::State: Clone,
266{
267    type FromWorkflow = W;
268    type ToWorkflow = W;
269
270    fn source_workflow_id(&self) -> &WorkflowId {
271        &self.source
272    }
273
274    fn target_workflow_id(&self) -> &WorkflowId {
275        &self.target
276    }
277
278    fn migrate(&self, state: W::State) -> Result<W::State, String> {
279        Ok(state)
280    }
281}
282
283// ── helpers ────────────────────────────────────────────────────────────────────
284
285/// Parse `(tenant_id, process_id)` from a process stream identifier.
286///
287/// Stream IDs for process streams follow the format
288/// `process/{tenant_uuid}/{process_uuid}` (includes a tenant discriminator
289/// for single-tenant isolation).  Returns `None` when the format does not match.
290fn parse_process_stream_id(stream_id: &str) -> Option<(TenantId, ProcessId)> {
291    let rest = stream_id.strip_prefix("process/")?;
292    let (tenant_str, process_str) = rest.split_once('/')?;
293    let tenant_uuid = uuid::Uuid::parse_str(tenant_str).ok()?;
294    let process_uuid = uuid::Uuid::parse_str(process_str).ok()?;
295    Some((
296        TenantId::from_uuid(tenant_uuid),
297        ProcessId::from_uuid(process_uuid),
298    ))
299}
300
301// ── MigrationRunner ───────────────────────────────────────────────────────────
302
303/// Drives a [`StateMigration`] over every event stream in a store.
304///
305/// Constructed with separate [`EventStore`] and [`SnapshotStore`] handles so
306/// the runner can operate against any backend combination (e.g. `SlateDbStore`
307/// for events and `SlateDbSnapshotStore` for snapshots, or in-memory stores
308/// during testing).
309///
310/// # Concurrency
311///
312/// `run()` processes streams **sequentially**. For deployments with thousands
313/// of in-flight processes, wrapping the call in a dedicated migration task and
314/// using a custom prefix filter via [`EventStore::list_streams`] (e.g. filtering
315/// by stream-id prefix) can reduce the scan scope.
316pub struct MigrationRunner<M, ES, SS> {
317    migration: M,
318    event_store: ES,
319    snap_store: SS,
320}
321
322impl<M, ES, SS> MigrationRunner<M, ES, SS>
323where
324    M: StateMigration,
325    <M::FromWorkflow as Workflow>::State: serde::de::DeserializeOwned,
326    <M::ToWorkflow as Workflow>::State: serde::Serialize,
327    ES: EventStore,
328    SS: SnapshotStore,
329{
330    /// Construct a new runner.
331    #[must_use]
332    pub fn new(migration: M, event_store: ES, snap_store: SS) -> Self {
333        Self {
334            migration,
335            event_store,
336            snap_store,
337        }
338    }
339
340    /// Scan all event streams and migrate those that match `source_workflow_id`.
341    ///
342    /// - Streams with no events or a different `workflow_id` are counted in
343    ///   [`MigrationReport::skipped`].
344    /// - Streams that fail (replay error, `migrate()` returning `Err`, or
345    ///   snapshot write failure) are recorded in [`MigrationReport::errors`]
346    ///   and do **not** abort the run.
347    ///
348    /// If `list_streams` itself fails, a single error entry covering
349    /// `"(list_streams)"` is returned immediately.
350    pub async fn run(&self) -> MigrationReport {
351        let streams = match self.event_store.list_streams(None).await {
352            Ok(s) => s,
353            Err(e) => {
354                return MigrationReport {
355                    errors: vec![MigrationError {
356                        stream_id: StreamId::new("(list_streams)"),
357                        message: format!("list_streams failed: {e}"),
358                    }],
359                    ..Default::default()
360                };
361            }
362        };
363
364        let mut report = MigrationReport::default();
365
366        for stream_id in streams {
367            match self.migrate_stream(&stream_id).await {
368                Ok(true) => report.migrated += 1,
369                Ok(false) => report.skipped += 1,
370                Err(err) => report.errors.push(err),
371            }
372        }
373
374        report
375    }
376
377    /// Like [`run`] but also updates [`ProcessRegistry`] entries after each
378    /// successful migration.
379    ///
380    /// For every migrated stream the runner:
381    ///
382    /// 1. Parses `(tenant_id, process_id)` from the stream ID
383    ///    (`process/{tenant_id}/{process_id}`).
384    /// 2. Looks up `RegistryKey::from_process(process_id)` for that tenant.
385    /// 3. Rewrites the stored [`ProcessIdentity`] with the new `workflow_id`
386    ///    and updated `stream_id` (which embeds the tenant discriminator).
387    ///
388    /// Entries for conversation- or correlation-based routing keys are
389    /// typically short-lived (they exist only during an active EDIFACT
390    /// exchange) and do not need updating here.
391    ///
392    /// Registry update failures are recorded as warnings in the returned
393    /// [`MigrationReport`] but do **not** roll back the snapshot that was
394    /// already written.
395    ///
396    /// [`run`]: MigrationRunner::run
397    /// [`ProcessRegistry`]: crate::registry::ProcessRegistry
398    /// [`ProcessIdentity`]: crate::ids::ProcessIdentity
399    pub async fn run_and_update_registry<R>(&self, registry: &R) -> MigrationReport
400    where
401        R: crate::registry::ProcessRegistry,
402    {
403        let streams = match self.event_store.list_streams(None).await {
404            Ok(s) => s,
405            Err(e) => {
406                return MigrationReport {
407                    errors: vec![MigrationError {
408                        stream_id: StreamId::new("(list_streams)"),
409                        message: format!("list_streams failed: {e}"),
410                    }],
411                    ..Default::default()
412                };
413            }
414        };
415
416        let mut report = MigrationReport::default();
417
418        for stream_id in streams {
419            match self.migrate_stream(&stream_id).await {
420                Ok(false) => {
421                    report.skipped += 1;
422                }
423                Ok(true) => {
424                    report.migrated += 1;
425                    // Best-effort registry update: parse tenant + process from
426                    // the stream ID and rewrite the primary process-keyed entry.
427                    if let Some((tenant_id, process_id)) =
428                        parse_process_stream_id(stream_id.as_str())
429                    {
430                        let key = crate::registry::RegistryKey::from_process(process_id);
431                        match registry.lookup(tenant_id, &key).await {
432                            Ok(Some(mut identity)) => {
433                                // Rebind workflow_id to the new version; the
434                                // stream_id is already correct (unchanged by
435                                // migration).
436                                identity.workflow_id = self.migration.target_workflow_id().clone();
437                                if let Err(e) = registry.register(tenant_id, &key, identity).await {
438                                    report.errors.push(MigrationError {
439                                        stream_id: stream_id.clone(),
440                                        message: format!(
441                                            "registry update failed for process {process_id}: {e}"
442                                        ),
443                                    });
444                                }
445                            }
446                            Ok(None) => {
447                                // No direct-process registry entry — process
448                                // might only be accessible via conversation/
449                                // correlation keys; nothing to do here.
450                            }
451                            Err(e) => {
452                                report.errors.push(MigrationError {
453                                    stream_id: stream_id.clone(),
454                                    message: format!(
455                                        "registry lookup failed for process {process_id}: {e}"
456                                    ),
457                                });
458                            }
459                        }
460                    } else {
461                        tracing::warn!(
462                            stream_id = stream_id.as_str(),
463                            "run_and_update_registry: cannot parse tenant/process from \
464                             stream_id — registry update skipped for this stream",
465                        );
466                    }
467                }
468                Err(err) => {
469                    report.errors.push(err);
470                }
471            }
472        }
473
474        report
475    }
476
477    /// Attempt to migrate a single stream.
478    ///
479    /// Returns `Ok(true)` when the stream was migrated, `Ok(false)` when it
480    /// was skipped.
481    async fn migrate_stream(&self, stream_id: &StreamId) -> Result<bool, MigrationError> {
482        // Load all events. We need them for both the workflow_id check (peek
483        // the first event) and the fold. A cursor-based alternative would
484        // require two round-trips; load_all is acceptable for a one-time
485        // migration operation.
486        let events = self
487            .event_store
488            .load(stream_id)
489            .await
490            .map_err(|e| MigrationError {
491                stream_id: stream_id.clone(),
492                message: format!("event load failed: {e}"),
493            })?;
494
495        let Some(first) = events.first() else {
496            // Empty stream — nothing to migrate.
497            return Ok(false);
498        };
499
500        if &first.workflow_id != self.migration.source_workflow_id() {
501            // Different workflow — skip.
502            return Ok(false);
503        }
504
505        // Fold state using FromWorkflow.
506        let mut state = <M::FromWorkflow as Workflow>::State::default();
507        let last_seq = events.last().map_or(0, |e| e.sequence_number);
508
509        for env in events {
510            let payload = M::FromWorkflow::upcast(&env.event_type, env.schema_version, env.payload)
511                .map_err(|e| MigrationError {
512                    stream_id: stream_id.clone(),
513                    message: format!("upcast failed on seq {}: {e}", env.sequence_number),
514                })?;
515            let event: <M::FromWorkflow as Workflow>::Event = serde_json::from_value(payload)
516                .map_err(|e| MigrationError {
517                    stream_id: stream_id.clone(),
518                    message: format!("event deserialize failed: {e}"),
519                })?;
520            state = M::FromWorkflow::apply(state, &event);
521        }
522
523        // Apply the user-supplied migration function.
524        let new_state = self
525            .migration
526            .migrate(state)
527            .map_err(|msg| MigrationError {
528                stream_id: stream_id.clone(),
529                message: msg,
530            })?;
531
532        // Serialize the migrated state.
533        let payload = serde_json::to_value(&new_state).map_err(|e| MigrationError {
534            stream_id: stream_id.clone(),
535            message: format!("state serialization failed: {e}"),
536        })?;
537
538        // Save a snapshot under the new workflow's schema version. Future
539        // Process::<ToWorkflow>::state_with_snapshot() calls will load this
540        // snapshot and skip the old (incompatible) events entirely.
541        let snap = Snapshot::new(
542            stream_id.clone(),
543            last_seq,
544            M::ToWorkflow::state_schema_version(),
545            payload,
546        );
547        self.snap_store
548            .save(&snap)
549            .await
550            .map_err(|e| MigrationError {
551                stream_id: stream_id.clone(),
552                message: format!("snapshot save failed: {e}"),
553            })?;
554
555        Ok(true)
556    }
557}
558
559// ── Tests ─────────────────────────────────────────────────────────────────────
560
561#[cfg(test)]
562mod tests {
563    use super::*;
564    use crate::{
565        envelope::NewEvent,
566        event_store::{ExpectedVersion, InMemoryEventStore},
567        ids::{ConversationId, CorrelationId, ProcessId, StreamId, TenantId},
568        snapshot::NoopSnapshotStore,
569        version::WorkflowId,
570        workflow::{CommandPayload, EventPayload, Workflow},
571    };
572
573    // ── Minimal "counter" workflows for test purposes ─────────────────────────
574
575    #[derive(Default, Clone, serde::Serialize, serde::Deserialize, PartialEq, Debug)]
576    struct CounterStateV1 {
577        count: u32,
578    }
579
580    #[derive(Clone, serde::Serialize, serde::Deserialize)]
581    enum CounterEventV1 {
582        Incremented,
583    }
584
585    impl EventPayload for CounterEventV1 {
586        fn event_type(&self) -> &'static str {
587            "Incremented"
588        }
589    }
590
591    #[derive(Clone)]
592    enum CounterCommandV1 {}
593
594    impl CommandPayload for CounterCommandV1 {}
595
596    struct CounterWorkflowV1;
597    impl Workflow for CounterWorkflowV1 {
598        type State = CounterStateV1;
599        type Event = CounterEventV1;
600        type Command = CounterCommandV1;
601
602        fn handle(
603            _state: &Self::State,
604            _cmd: Self::Command,
605        ) -> Result<crate::workflow::WorkflowOutput<Self::Event>, crate::error::WorkflowError>
606        {
607            unreachable!("not used in migration tests")
608        }
609
610        fn apply(mut state: Self::State, event: &Self::Event) -> Self::State {
611            match event {
612                CounterEventV1::Incremented => state.count += 1,
613            }
614            state
615        }
616    }
617
618    // V2 adds a `label` field.
619    #[derive(Default, Clone, serde::Serialize, serde::Deserialize, PartialEq, Debug)]
620    struct CounterStateV2 {
621        count: u32,
622        label: String,
623    }
624
625    #[derive(Clone, serde::Serialize, serde::Deserialize)]
626    enum CounterEventV2 {
627        Incremented,
628    }
629
630    impl EventPayload for CounterEventV2 {
631        fn event_type(&self) -> &'static str {
632            "Incremented"
633        }
634    }
635
636    #[derive(Clone)]
637    enum CounterCommandV2 {}
638
639    impl CommandPayload for CounterCommandV2 {}
640
641    struct CounterWorkflowV2;
642    impl Workflow for CounterWorkflowV2 {
643        type State = CounterStateV2;
644        type Event = CounterEventV2;
645        type Command = CounterCommandV2;
646
647        fn handle(
648            _state: &Self::State,
649            _cmd: Self::Command,
650        ) -> Result<crate::workflow::WorkflowOutput<Self::Event>, crate::error::WorkflowError>
651        {
652            unreachable!("not used in migration tests")
653        }
654
655        fn apply(mut state: Self::State, event: &Self::Event) -> Self::State {
656            match event {
657                CounterEventV2::Incremented => state.count += 1,
658            }
659            state
660        }
661
662        fn state_schema_version() -> u32 {
663            2
664        }
665    }
666
667    // ── Migration implementation ──────────────────────────────────────────────
668
669    struct V1ToV2;
670
671    impl StateMigration for V1ToV2 {
672        type FromWorkflow = CounterWorkflowV1;
673        type ToWorkflow = CounterWorkflowV2;
674
675        fn source_workflow_id(&self) -> &WorkflowId {
676            static WID: std::sync::OnceLock<WorkflowId> = std::sync::OnceLock::new();
677            WID.get_or_init(|| WorkflowId::new("counter", "FV2024-10-01"))
678        }
679
680        fn target_workflow_id(&self) -> &WorkflowId {
681            static WID: std::sync::OnceLock<WorkflowId> = std::sync::OnceLock::new();
682            WID.get_or_init(|| WorkflowId::new("counter", "FV2025-04-01"))
683        }
684
685        fn migrate(&self, state: CounterStateV1) -> Result<CounterStateV2, String> {
686            Ok(CounterStateV2 {
687                count: state.count,
688                label: "migrated".into(),
689            })
690        }
691    }
692
693    // ── Helpers ───────────────────────────────────────────────────────────────
694
695    fn make_increment_event(workflow_id: WorkflowId) -> NewEvent {
696        let pid = ProcessId::new();
697        let tid = TenantId::new();
698        NewEvent {
699            correlation_id: CorrelationId::new(),
700            causation_id: None,
701            conversation_id: ConversationId::new(),
702            process_id: pid,
703            tenant_id: tid,
704            workflow_id,
705            event_type: "Incremented".into(),
706            schema_version: 1,
707            payload: serde_json::to_value(CounterEventV1::Incremented).unwrap(),
708        }
709    }
710
711    // ── Tests ─────────────────────────────────────────────────────────────────
712
713    #[tokio::test]
714    async fn migrate_matching_stream() {
715        let store = InMemoryEventStore::default();
716        let snaps = crate::snapshot::InMemorySnapshotStore::new();
717        let sid = StreamId::new("process/counter-001");
718        let wid_v1 = WorkflowId::new("counter", "FV2024-10-01");
719
720        // Append 3 increment events under the V1 workflow.
721        store
722            .append(
723                &sid,
724                ExpectedVersion::Any,
725                &[
726                    make_increment_event(wid_v1.clone()),
727                    make_increment_event(wid_v1.clone()),
728                    make_increment_event(wid_v1.clone()),
729                ],
730            )
731            .await
732            .unwrap();
733
734        let runner = MigrationRunner::new(V1ToV2, store, snaps.clone());
735        let report = runner.run().await;
736
737        assert!(report.is_ok(), "errors: {:?}", report.errors);
738        assert_eq!(report.migrated, 1);
739        assert_eq!(report.skipped, 0);
740
741        // The snapshot should encode the migrated V2 state.
742        let snap = snaps.load(&sid).await.unwrap().expect("snapshot saved");
743        assert_eq!(snap.state_schema_version, 2);
744        let state: CounterStateV2 = serde_json::from_value(snap.state).unwrap();
745        assert_eq!(state.count, 3);
746        assert_eq!(state.label, "migrated");
747    }
748
749    #[tokio::test]
750    async fn skip_non_matching_stream() {
751        let store = InMemoryEventStore::default();
752        let snaps = NoopSnapshotStore;
753        let sid = StreamId::new("process/counter-other");
754        let wid_v2 = WorkflowId::new("counter", "FV2025-04-01"); // already V2
755
756        store
757            .append(&sid, ExpectedVersion::Any, &[make_increment_event(wid_v2)])
758            .await
759            .unwrap();
760
761        let runner = MigrationRunner::new(V1ToV2, store, snaps);
762        let report = runner.run().await;
763
764        assert!(report.is_ok());
765        assert_eq!(report.migrated, 0);
766        assert_eq!(report.skipped, 1);
767    }
768
769    #[tokio::test]
770    async fn skip_empty_stream() {
771        let store = InMemoryEventStore::default();
772        let snaps = NoopSnapshotStore;
773        // list_streams returns an empty list for an unused store.
774        let runner = MigrationRunner::new(V1ToV2, store, snaps);
775        let report = runner.run().await;
776
777        assert!(report.is_ok());
778        assert_eq!(report.migrated, 0);
779        assert_eq!(report.skipped, 0);
780    }
781
782    #[tokio::test]
783    async fn migration_fn_error_is_recorded_not_fatal() {
784        struct FailingMigration;
785
786        impl StateMigration for FailingMigration {
787            type FromWorkflow = CounterWorkflowV1;
788            type ToWorkflow = CounterWorkflowV2;
789            fn source_workflow_id(&self) -> &WorkflowId {
790                static WID: std::sync::OnceLock<WorkflowId> = std::sync::OnceLock::new();
791                WID.get_or_init(|| WorkflowId::new("counter", "FV2024-10-01"))
792            }
793
794            fn target_workflow_id(&self) -> &WorkflowId {
795                static WID: std::sync::OnceLock<WorkflowId> = std::sync::OnceLock::new();
796                WID.get_or_init(|| WorkflowId::new("counter", "FV2025-04-01"))
797            }
798
799            fn migrate(&self, _state: CounterStateV1) -> Result<CounterStateV2, String> {
800                Err("intentional test failure".into())
801            }
802        }
803
804        let store = InMemoryEventStore::default();
805        let snaps = NoopSnapshotStore;
806        let sid = StreamId::new("process/failing");
807        let wid_v1 = WorkflowId::new("counter", "FV2024-10-01");
808
809        store
810            .append(&sid, ExpectedVersion::Any, &[make_increment_event(wid_v1)])
811            .await
812            .unwrap();
813
814        let runner = MigrationRunner::new(FailingMigration, store, snaps);
815        let report = runner.run().await;
816
817        assert!(!report.is_ok());
818        assert_eq!(report.errors.len(), 1);
819        assert_eq!(report.migrated, 0);
820        assert!(
821            report.errors[0]
822                .message
823                .contains("intentional test failure")
824        );
825    }
826}