pub struct MigrationRunner<M, ES, SS> { /* private fields */ }Expand description
Drives a StateMigration over every event stream in a store.
Constructed with separate EventStore and SnapshotStore handles so
the runner can operate against any backend combination (e.g. SlateDbStore
for events and SlateDbSnapshotStore for snapshots, or in-memory stores
during testing).
§Concurrency
run() processes streams sequentially. For deployments with thousands
of in-flight processes, wrapping the call in a dedicated migration task and
using a custom prefix filter via EventStore::list_streams (e.g. filtering
by stream-id prefix) can reduce the scan scope.
Implementations§
Source§impl<M, ES, SS> MigrationRunner<M, ES, SS>where
M: StateMigration,
<M::FromWorkflow as Workflow>::State: DeserializeOwned,
<M::ToWorkflow as Workflow>::State: Serialize,
ES: EventStore,
SS: SnapshotStore,
impl<M, ES, SS> MigrationRunner<M, ES, SS>where
M: StateMigration,
<M::FromWorkflow as Workflow>::State: DeserializeOwned,
<M::ToWorkflow as Workflow>::State: Serialize,
ES: EventStore,
SS: SnapshotStore,
Sourcepub async fn run(&self) -> MigrationReport
pub async fn run(&self) -> MigrationReport
Scan all event streams and migrate those that match source_workflow_id.
- Streams with no events or a different
workflow_idare counted inMigrationReport::skipped. - Streams that fail (replay error,
migrate()returningErr, or snapshot write failure) are recorded inMigrationReport::errorsand do not abort the run.
If list_streams itself fails, a single error entry covering
"(list_streams)" is returned immediately.
Sourcepub async fn run_and_update_registry<R>(&self, registry: &R) -> MigrationReportwhere
R: ProcessRegistry,
pub async fn run_and_update_registry<R>(&self, registry: &R) -> MigrationReportwhere
R: ProcessRegistry,
Like run but also updates ProcessRegistry entries after each
successful migration.
For every migrated stream the runner:
- Parses
(tenant_id, process_id)from the stream ID (process/{tenant_id}/{process_id}). - Looks up
RegistryKey::from_process(process_id)for that tenant. - Rewrites the stored
ProcessIdentitywith the newworkflow_idand updatedstream_id(which embeds the tenant discriminator).
Entries for conversation- or correlation-based routing keys are typically short-lived (they exist only during an active EDIFACT exchange) and do not need updating here.
Registry update failures are recorded as warnings in the returned
MigrationReport but do not roll back the snapshot that was
already written.