mako_engine/migration.rs
1//! In-flight process state migration across BDEW format-version boundaries.
2//!
3//! When an old BDEW format version (FV) is removed from the adapter registry
4//! after its grace period, any process initiated under that FV can no longer
5//! receive new events — the `ForwardCompatible` policy rejects FV mismatches
6//! at dispatch time. [`StateMigration`] + [`MigrationRunner`] provide the
7//! tooling to advance those processes to a newer FV *before* the old FV is
8//! retired.
9//!
10//! # How it works
11//!
12//! 1. **Scan** all event streams via [`EventStore::list_streams`].
13//! 2. **Filter** streams whose first event carries
14//! `workflow_id == migration.source_workflow_id()`.
15//! 3. **Replay** each matched stream using `FromWorkflow::apply` to reconstruct
16//! the fully-folded state.
17//! 4. **Migrate** the state via [`StateMigration::migrate`].
18//! 5. **Snapshot** the migrated state under `target_workflow_id`'s schema version so
19//! the process can continue executing under the new workflow definition without
20//! replaying old incompatible events.
21//!
22//! # Deployment sequence
23//!
24//! 1. Deploy the new binary with **both** FVs still registered in the adapter
25//! registry.
26//! 2. Run `MigrationRunner::run_and_update_registry(®istry)` — all in-flight
27//! processes are migrated and their routing-table entries are rewritten to use
28//! the new `workflow_id`. Inspect [`MigrationReport`] for errors.
29//! 3. Remove the old FV from the adapter registry and redeploy.
30//!
31//! > **Note**: `MigrationRunner::run_and_update_registry` handles the
32//! > `ProcessRegistry` update automatically (updating `ProcessIdentity.workflow_id`
33//! > for every primary process-keyed entry). For conversation- or correlation-keyed
34//! > entries (which are short-lived and self-expire) no action is needed.
35//! > If you only want the snapshot migration without registry updates, use the
36//! > simpler `MigrationRunner::run()` instead.
37//!
38//! # Example
39//!
40//! ```rust,ignore
41//! use mako_engine::{
42//! migration::{MigrationRunner, StateMigration},
43//! version::WorkflowId,
44//! };
45//!
46//! struct SupplierChangeFv2024ToFv2025;
47//!
48//! impl StateMigration for SupplierChangeFv2024ToFv2025 {
49//! type FromWorkflow = GpkeSupplierChangeWorkflowFv2024;
50//! type ToWorkflow = GpkeSupplierChangeWorkflowFv2025;
51//!
52//! fn source_workflow_id(&self) -> &WorkflowId { &FV2024_WORKFLOW_ID }
53//! fn target_workflow_id(&self) -> &WorkflowId { &FV2025_WORKFLOW_ID }
54//!
55//! fn migrate(
56//! &self,
57//! state: SupplierChangeStateFv2024,
58//! ) -> Result<SupplierChangeStateFv2025, String> {
59//! Ok(SupplierChangeStateFv2025::from_v2024(state))
60//! }
61//! }
62//!
63//! let runner = MigrationRunner::new(
64//! SupplierChangeFv2024ToFv2025,
65//! event_store,
66//! snap_store,
67//! );
68//! let report = runner.run().await;
69//! assert!(report.is_ok(), "migration errors: {:?}", report.errors);
70//! ```
71//!
72//! [`ProcessRegistry`]: crate::registry::ProcessRegistry
73//! [`ProcessIdentity`]: crate::ids::ProcessIdentity
74//! [`EventStore::list_streams`]: crate::event_store::EventStore::list_streams
75
76use crate::{
77 event_store::EventStore,
78 ids::{ProcessId, StreamId, TenantId},
79 snapshot::{Snapshot, SnapshotStore},
80 version::WorkflowId,
81 workflow::Workflow,
82};
83
84// ── MigrationError ────────────────────────────────────────────────────────────
85
86/// Describes a failure to migrate a single process stream.
87#[derive(Debug, Clone)]
88pub struct MigrationError {
89 /// The stream that could not be migrated.
90 pub stream_id: StreamId,
91 /// Human-readable failure reason.
92 pub message: String,
93}
94
95impl std::fmt::Display for MigrationError {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 write!(
98 f,
99 "migration error on stream {}: {}",
100 self.stream_id, self.message
101 )
102 }
103}
104
105impl std::error::Error for MigrationError {}
106
107// ── StateMigration ────────────────────────────────────────────────────────────
108
109/// A typed, one-directional migration from one workflow version to another.
110///
111/// Implement this trait for each FV-to-FV transition your deployment requires.
112/// The [`migrate`] function is called once per in-flight process stream and must
113/// be **pure** (no I/O, no clock access, no global state).
114///
115/// ## Additive-only changes (same state type)
116///
117/// When both FVs share the same `State` type (only new optional fields added),
118/// the migration can be a no-op:
119///
120/// ```rust,ignore
121/// fn migrate(&self, state: SharedState) -> Result<SharedState, String> {
122/// Ok(state)
123/// }
124/// ```
125///
126/// ## Structural changes (different state types)
127///
128/// When the state layout changed incompatibly (renamed variant, removed field,
129/// changed discriminant), use different `FromWorkflow::State` and
130/// `ToWorkflow::State` types. The Rust compiler will enforce that every field
131/// is explicitly mapped:
132///
133/// ```rust,ignore
134/// fn migrate(&self, old: OldState) -> Result<NewState, String> {
135/// match old {
136/// OldState::Initiated(data) => Ok(NewState::Initiated(NewInitiatedData {
137/// ems_id: data.ems_id,
138/// malo_id: data.malo_id,
139/// initiated: data.initiated_at, // renamed field
140/// })),
141/// OldState::Completed => Ok(NewState::Completed),
142/// }
143/// }
144/// ```
145///
146/// [`migrate`]: StateMigration::migrate
147pub trait StateMigration: Send + Sync + 'static {
148 /// The old workflow definition whose events are stored in matched streams.
149 type FromWorkflow: Workflow;
150 /// The new workflow definition that continues execution after migration.
151 type ToWorkflow: Workflow;
152
153 /// The `WorkflowId` (name + old BDEW FV) that identifies streams to migrate.
154 fn source_workflow_id(&self) -> &WorkflowId;
155
156 /// The `WorkflowId` (name + new BDEW FV) stamped into the migrated snapshot.
157 fn target_workflow_id(&self) -> &WorkflowId;
158
159 /// Map the fully-replayed old state to the new state type.
160 ///
161 /// Called once per matched stream. Must be **pure**.
162 ///
163 /// # Errors
164 ///
165 /// Return `Err(human_readable_reason)` when the state cannot be migrated.
166 /// The runner records the failure in [`MigrationReport::errors`] and
167 /// continues with remaining streams — a single bad stream does not abort
168 /// the entire migration.
169 fn migrate(
170 &self,
171 state: <Self::FromWorkflow as Workflow>::State,
172 ) -> Result<<Self::ToWorkflow as Workflow>::State, String>;
173}
174
175// ── MigrationReport ───────────────────────────────────────────────────────────
176
177/// Summary produced by a [`MigrationRunner::run`] call.
178#[derive(Debug, Default)]
179pub struct MigrationReport {
180 /// Number of streams successfully migrated and snapshotted.
181 pub migrated: usize,
182 /// Number of streams skipped (wrong `workflow_id`, empty, or already migrated).
183 pub skipped: usize,
184 /// Streams that encountered an error during replay, migration, or snapshot.
185 pub errors: Vec<MigrationError>,
186}
187
188impl MigrationReport {
189 /// Return `true` when no migration errors occurred.
190 ///
191 /// A `true` result does not imply that all processes were migrated — only
192 /// that all matched processes succeeded. Check [`migrated`] and [`skipped`]
193 /// to verify expected migration counts.
194 ///
195 /// [`migrated`]: MigrationReport::migrated
196 /// [`skipped`]: MigrationReport::skipped
197 #[must_use]
198 pub fn is_ok(&self) -> bool {
199 self.errors.is_empty()
200 }
201}
202
203impl std::fmt::Display for MigrationReport {
204 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205 write!(
206 f,
207 "MigrationReport {{ migrated: {}, skipped: {}, errors: {} }}",
208 self.migrated,
209 self.skipped,
210 self.errors.len(),
211 )
212 }
213}
214
215// ── helpers ────────────────────────────────────────────────────────────────────
216
217/// Parse `(tenant_id, process_id)` from a process stream identifier.
218///
219/// Stream IDs for process streams follow the format
220/// `process/{tenant_uuid}/{process_uuid}` (includes a tenant discriminator
221/// for single-tenant isolation). Returns `None` when the format does not match.
222fn parse_process_stream_id(stream_id: &str) -> Option<(TenantId, ProcessId)> {
223 let rest = stream_id.strip_prefix("process/")?;
224 let (tenant_str, process_str) = rest.split_once('/')?;
225 let tenant_uuid = uuid::Uuid::parse_str(tenant_str).ok()?;
226 let process_uuid = uuid::Uuid::parse_str(process_str).ok()?;
227 Some((
228 TenantId::from_uuid(tenant_uuid),
229 ProcessId::from_uuid(process_uuid),
230 ))
231}
232
233// ── MigrationRunner ───────────────────────────────────────────────────────────
234
235/// Drives a [`StateMigration`] over every event stream in a store.
236///
237/// Constructed with separate [`EventStore`] and [`SnapshotStore`] handles so
238/// the runner can operate against any backend combination (e.g. `SlateDbStore`
239/// for events and `SlateDbSnapshotStore` for snapshots, or in-memory stores
240/// during testing).
241///
242/// # Concurrency
243///
244/// `run()` processes streams **sequentially**. For deployments with thousands
245/// of in-flight processes, wrapping the call in a dedicated migration task and
246/// using a custom prefix filter via [`EventStore::list_streams`] (e.g. filtering
247/// by stream-id prefix) can reduce the scan scope.
248pub struct MigrationRunner<M, ES, SS> {
249 migration: M,
250 event_store: ES,
251 snap_store: SS,
252}
253
254impl<M, ES, SS> MigrationRunner<M, ES, SS>
255where
256 M: StateMigration,
257 <M::FromWorkflow as Workflow>::State: serde::de::DeserializeOwned,
258 <M::ToWorkflow as Workflow>::State: serde::Serialize,
259 ES: EventStore,
260 SS: SnapshotStore,
261{
262 /// Construct a new runner.
263 #[must_use]
264 pub fn new(migration: M, event_store: ES, snap_store: SS) -> Self {
265 Self {
266 migration,
267 event_store,
268 snap_store,
269 }
270 }
271
272 /// Scan all event streams and migrate those that match `source_workflow_id`.
273 ///
274 /// - Streams with no events or a different `workflow_id` are counted in
275 /// [`MigrationReport::skipped`].
276 /// - Streams that fail (replay error, `migrate()` returning `Err`, or
277 /// snapshot write failure) are recorded in [`MigrationReport::errors`]
278 /// and do **not** abort the run.
279 ///
280 /// If `list_streams` itself fails, a single error entry covering
281 /// `"(list_streams)"` is returned immediately.
282 pub async fn run(&self) -> MigrationReport {
283 let streams = match self.event_store.list_streams(None).await {
284 Ok(s) => s,
285 Err(e) => {
286 return MigrationReport {
287 errors: vec![MigrationError {
288 stream_id: StreamId::new("(list_streams)"),
289 message: format!("list_streams failed: {e}"),
290 }],
291 ..Default::default()
292 };
293 }
294 };
295
296 let mut report = MigrationReport::default();
297
298 for stream_id in streams {
299 match self.migrate_stream(&stream_id).await {
300 Ok(true) => report.migrated += 1,
301 Ok(false) => report.skipped += 1,
302 Err(err) => report.errors.push(err),
303 }
304 }
305
306 report
307 }
308
309 /// Like [`run`] but also updates [`ProcessRegistry`] entries after each
310 /// successful migration.
311 ///
312 /// For every migrated stream the runner:
313 ///
314 /// 1. Parses `(tenant_id, process_id)` from the stream ID
315 /// (`process/{tenant_id}/{process_id}`).
316 /// 2. Looks up `RegistryKey::from_process(process_id)` for that tenant.
317 /// 3. Rewrites the stored [`ProcessIdentity`] with the new `workflow_id`
318 /// and updated `stream_id` (which embeds the tenant discriminator).
319 ///
320 /// Entries for conversation- or correlation-based routing keys are
321 /// typically short-lived (they exist only during an active EDIFACT
322 /// exchange) and do not need updating here.
323 ///
324 /// Registry update failures are recorded as warnings in the returned
325 /// [`MigrationReport`] but do **not** roll back the snapshot that was
326 /// already written.
327 ///
328 /// [`run`]: MigrationRunner::run
329 /// [`ProcessRegistry`]: crate::registry::ProcessRegistry
330 /// [`ProcessIdentity`]: crate::ids::ProcessIdentity
331 pub async fn run_and_update_registry<R>(&self, registry: &R) -> MigrationReport
332 where
333 R: crate::registry::ProcessRegistry,
334 {
335 let streams = match self.event_store.list_streams(None).await {
336 Ok(s) => s,
337 Err(e) => {
338 return MigrationReport {
339 errors: vec![MigrationError {
340 stream_id: StreamId::new("(list_streams)"),
341 message: format!("list_streams failed: {e}"),
342 }],
343 ..Default::default()
344 };
345 }
346 };
347
348 let mut report = MigrationReport::default();
349
350 for stream_id in streams {
351 match self.migrate_stream(&stream_id).await {
352 Ok(false) => {
353 report.skipped += 1;
354 }
355 Ok(true) => {
356 report.migrated += 1;
357 // Best-effort registry update: parse tenant + process from
358 // the stream ID and rewrite the primary process-keyed entry.
359 if let Some((tenant_id, process_id)) =
360 parse_process_stream_id(stream_id.as_str())
361 {
362 let key = crate::registry::RegistryKey::from_process(process_id);
363 match registry.lookup(tenant_id, &key).await {
364 Ok(Some(mut identity)) => {
365 // Rebind workflow_id to the new version; the
366 // stream_id is already correct (unchanged by
367 // migration).
368 identity.workflow_id = self.migration.target_workflow_id().clone();
369 if let Err(e) = registry.register(tenant_id, &key, identity).await {
370 report.errors.push(MigrationError {
371 stream_id: stream_id.clone(),
372 message: format!(
373 "registry update failed for process {process_id}: {e}"
374 ),
375 });
376 }
377 }
378 Ok(None) => {
379 // No direct-process registry entry — process
380 // might only be accessible via conversation/
381 // correlation keys; nothing to do here.
382 }
383 Err(e) => {
384 report.errors.push(MigrationError {
385 stream_id: stream_id.clone(),
386 message: format!(
387 "registry lookup failed for process {process_id}: {e}"
388 ),
389 });
390 }
391 }
392 } else {
393 tracing::warn!(
394 stream_id = stream_id.as_str(),
395 "run_and_update_registry: cannot parse tenant/process from \
396 stream_id — registry update skipped for this stream",
397 );
398 }
399 }
400 Err(err) => {
401 report.errors.push(err);
402 }
403 }
404 }
405
406 report
407 }
408
409 /// Attempt to migrate a single stream.
410 ///
411 /// Returns `Ok(true)` when the stream was migrated, `Ok(false)` when it
412 /// was skipped.
413 async fn migrate_stream(&self, stream_id: &StreamId) -> Result<bool, MigrationError> {
414 // Load all events. We need them for both the workflow_id check (peek
415 // the first event) and the fold. A cursor-based alternative would
416 // require two round-trips; load_all is acceptable for a one-time
417 // migration operation.
418 let events = self
419 .event_store
420 .load(stream_id)
421 .await
422 .map_err(|e| MigrationError {
423 stream_id: stream_id.clone(),
424 message: format!("event load failed: {e}"),
425 })?;
426
427 let Some(first) = events.first() else {
428 // Empty stream — nothing to migrate.
429 return Ok(false);
430 };
431
432 if &first.workflow_id != self.migration.source_workflow_id() {
433 // Different workflow — skip.
434 return Ok(false);
435 }
436
437 // Fold state using FromWorkflow.
438 let mut state = <M::FromWorkflow as Workflow>::State::default();
439 let last_seq = events.last().map_or(0, |e| e.sequence_number);
440
441 for env in events {
442 let payload = M::FromWorkflow::upcast(&env.event_type, env.schema_version, env.payload)
443 .map_err(|e| MigrationError {
444 stream_id: stream_id.clone(),
445 message: format!("upcast failed on seq {}: {e}", env.sequence_number),
446 })?;
447 let event: <M::FromWorkflow as Workflow>::Event = serde_json::from_value(payload)
448 .map_err(|e| MigrationError {
449 stream_id: stream_id.clone(),
450 message: format!("event deserialize failed: {e}"),
451 })?;
452 state = M::FromWorkflow::apply(state, &event);
453 }
454
455 // Apply the user-supplied migration function.
456 let new_state = self
457 .migration
458 .migrate(state)
459 .map_err(|msg| MigrationError {
460 stream_id: stream_id.clone(),
461 message: msg,
462 })?;
463
464 // Serialize the migrated state.
465 let payload = serde_json::to_value(&new_state).map_err(|e| MigrationError {
466 stream_id: stream_id.clone(),
467 message: format!("state serialization failed: {e}"),
468 })?;
469
470 // Save a snapshot under the new workflow's schema version. Future
471 // Process::<ToWorkflow>::state_with_snapshot() calls will load this
472 // snapshot and skip the old (incompatible) events entirely.
473 let snap = Snapshot::new(
474 stream_id.clone(),
475 last_seq,
476 M::ToWorkflow::state_schema_version(),
477 payload,
478 );
479 self.snap_store
480 .save(&snap)
481 .await
482 .map_err(|e| MigrationError {
483 stream_id: stream_id.clone(),
484 message: format!("snapshot save failed: {e}"),
485 })?;
486
487 Ok(true)
488 }
489}
490
491// ── Tests ─────────────────────────────────────────────────────────────────────
492
493#[cfg(test)]
494mod tests {
495 use super::*;
496 use crate::{
497 envelope::NewEvent,
498 event_store::{ExpectedVersion, InMemoryEventStore},
499 ids::{ConversationId, CorrelationId, ProcessId, StreamId, TenantId},
500 snapshot::NoopSnapshotStore,
501 version::WorkflowId,
502 workflow::{CommandPayload, EventPayload, Workflow},
503 };
504
505 // ── Minimal "counter" workflows for test purposes ─────────────────────────
506
507 #[derive(Default, Clone, serde::Serialize, serde::Deserialize, PartialEq, Debug)]
508 struct CounterStateV1 {
509 count: u32,
510 }
511
512 #[derive(Clone, serde::Serialize, serde::Deserialize)]
513 enum CounterEventV1 {
514 Incremented,
515 }
516
517 impl EventPayload for CounterEventV1 {
518 fn event_type(&self) -> &'static str {
519 "Incremented"
520 }
521 }
522
523 #[derive(Clone)]
524 enum CounterCommandV1 {}
525
526 impl CommandPayload for CounterCommandV1 {}
527
528 struct CounterWorkflowV1;
529 impl Workflow for CounterWorkflowV1 {
530 type State = CounterStateV1;
531 type Event = CounterEventV1;
532 type Command = CounterCommandV1;
533
534 fn handle(
535 _state: &Self::State,
536 _cmd: Self::Command,
537 ) -> Result<crate::workflow::WorkflowOutput<Self::Event>, crate::error::WorkflowError>
538 {
539 unreachable!("not used in migration tests")
540 }
541
542 fn apply(mut state: Self::State, event: &Self::Event) -> Self::State {
543 match event {
544 CounterEventV1::Incremented => state.count += 1,
545 }
546 state
547 }
548 }
549
550 // V2 adds a `label` field.
551 #[derive(Default, Clone, serde::Serialize, serde::Deserialize, PartialEq, Debug)]
552 struct CounterStateV2 {
553 count: u32,
554 label: String,
555 }
556
557 #[derive(Clone, serde::Serialize, serde::Deserialize)]
558 enum CounterEventV2 {
559 Incremented,
560 }
561
562 impl EventPayload for CounterEventV2 {
563 fn event_type(&self) -> &'static str {
564 "Incremented"
565 }
566 }
567
568 #[derive(Clone)]
569 enum CounterCommandV2 {}
570
571 impl CommandPayload for CounterCommandV2 {}
572
573 struct CounterWorkflowV2;
574 impl Workflow for CounterWorkflowV2 {
575 type State = CounterStateV2;
576 type Event = CounterEventV2;
577 type Command = CounterCommandV2;
578
579 fn handle(
580 _state: &Self::State,
581 _cmd: Self::Command,
582 ) -> Result<crate::workflow::WorkflowOutput<Self::Event>, crate::error::WorkflowError>
583 {
584 unreachable!("not used in migration tests")
585 }
586
587 fn apply(mut state: Self::State, event: &Self::Event) -> Self::State {
588 match event {
589 CounterEventV2::Incremented => state.count += 1,
590 }
591 state
592 }
593
594 fn state_schema_version() -> u32 {
595 2
596 }
597 }
598
599 // ── Migration implementation ──────────────────────────────────────────────
600
601 struct V1ToV2;
602
603 impl StateMigration for V1ToV2 {
604 type FromWorkflow = CounterWorkflowV1;
605 type ToWorkflow = CounterWorkflowV2;
606
607 fn source_workflow_id(&self) -> &WorkflowId {
608 static WID: std::sync::OnceLock<WorkflowId> = std::sync::OnceLock::new();
609 WID.get_or_init(|| WorkflowId::new("counter", "FV2024-10-01"))
610 }
611
612 fn target_workflow_id(&self) -> &WorkflowId {
613 static WID: std::sync::OnceLock<WorkflowId> = std::sync::OnceLock::new();
614 WID.get_or_init(|| WorkflowId::new("counter", "FV2025-04-01"))
615 }
616
617 fn migrate(&self, state: CounterStateV1) -> Result<CounterStateV2, String> {
618 Ok(CounterStateV2 {
619 count: state.count,
620 label: "migrated".into(),
621 })
622 }
623 }
624
625 // ── Helpers ───────────────────────────────────────────────────────────────
626
627 fn make_increment_event(workflow_id: WorkflowId) -> NewEvent {
628 let pid = ProcessId::new();
629 let tid = TenantId::new();
630 NewEvent {
631 correlation_id: CorrelationId::new(),
632 causation_id: None,
633 conversation_id: ConversationId::new(),
634 process_id: pid,
635 tenant_id: tid,
636 workflow_id,
637 event_type: "Incremented".into(),
638 schema_version: 1,
639 payload: serde_json::to_value(CounterEventV1::Incremented).unwrap(),
640 }
641 }
642
643 // ── Tests ─────────────────────────────────────────────────────────────────
644
645 #[tokio::test]
646 async fn migrate_matching_stream() {
647 let store = InMemoryEventStore::default();
648 let snaps = crate::snapshot::InMemorySnapshotStore::new();
649 let sid = StreamId::new("process/counter-001");
650 let wid_v1 = WorkflowId::new("counter", "FV2024-10-01");
651
652 // Append 3 increment events under the V1 workflow.
653 store
654 .append(
655 &sid,
656 ExpectedVersion::Any,
657 &[
658 make_increment_event(wid_v1.clone()),
659 make_increment_event(wid_v1.clone()),
660 make_increment_event(wid_v1.clone()),
661 ],
662 )
663 .await
664 .unwrap();
665
666 let runner = MigrationRunner::new(V1ToV2, store, snaps.clone());
667 let report = runner.run().await;
668
669 assert!(report.is_ok(), "errors: {:?}", report.errors);
670 assert_eq!(report.migrated, 1);
671 assert_eq!(report.skipped, 0);
672
673 // The snapshot should encode the migrated V2 state.
674 let snap = snaps.load(&sid).await.unwrap().expect("snapshot saved");
675 assert_eq!(snap.state_schema_version, 2);
676 let state: CounterStateV2 = serde_json::from_value(snap.state).unwrap();
677 assert_eq!(state.count, 3);
678 assert_eq!(state.label, "migrated");
679 }
680
681 #[tokio::test]
682 async fn skip_non_matching_stream() {
683 let store = InMemoryEventStore::default();
684 let snaps = NoopSnapshotStore;
685 let sid = StreamId::new("process/counter-other");
686 let wid_v2 = WorkflowId::new("counter", "FV2025-04-01"); // already V2
687
688 store
689 .append(&sid, ExpectedVersion::Any, &[make_increment_event(wid_v2)])
690 .await
691 .unwrap();
692
693 let runner = MigrationRunner::new(V1ToV2, store, snaps);
694 let report = runner.run().await;
695
696 assert!(report.is_ok());
697 assert_eq!(report.migrated, 0);
698 assert_eq!(report.skipped, 1);
699 }
700
701 #[tokio::test]
702 async fn skip_empty_stream() {
703 let store = InMemoryEventStore::default();
704 let snaps = NoopSnapshotStore;
705 // list_streams returns an empty list for an unused store.
706 let runner = MigrationRunner::new(V1ToV2, store, snaps);
707 let report = runner.run().await;
708
709 assert!(report.is_ok());
710 assert_eq!(report.migrated, 0);
711 assert_eq!(report.skipped, 0);
712 }
713
714 #[tokio::test]
715 async fn migration_fn_error_is_recorded_not_fatal() {
716 struct FailingMigration;
717
718 impl StateMigration for FailingMigration {
719 type FromWorkflow = CounterWorkflowV1;
720 type ToWorkflow = CounterWorkflowV2;
721 fn source_workflow_id(&self) -> &WorkflowId {
722 static WID: std::sync::OnceLock<WorkflowId> = std::sync::OnceLock::new();
723 WID.get_or_init(|| WorkflowId::new("counter", "FV2024-10-01"))
724 }
725
726 fn target_workflow_id(&self) -> &WorkflowId {
727 static WID: std::sync::OnceLock<WorkflowId> = std::sync::OnceLock::new();
728 WID.get_or_init(|| WorkflowId::new("counter", "FV2025-04-01"))
729 }
730
731 fn migrate(&self, _state: CounterStateV1) -> Result<CounterStateV2, String> {
732 Err("intentional test failure".into())
733 }
734 }
735
736 let store = InMemoryEventStore::default();
737 let snaps = NoopSnapshotStore;
738 let sid = StreamId::new("process/failing");
739 let wid_v1 = WorkflowId::new("counter", "FV2024-10-01");
740
741 store
742 .append(&sid, ExpectedVersion::Any, &[make_increment_event(wid_v1)])
743 .await
744 .unwrap();
745
746 let runner = MigrationRunner::new(FailingMigration, store, snaps);
747 let report = runner.run().await;
748
749 assert!(!report.is_ok());
750 assert_eq!(report.errors.len(), 1);
751 assert_eq!(report.migrated, 0);
752 assert!(
753 report.errors[0]
754 .message
755 .contains("intentional test failure")
756 );
757 }
758}