Skip to main content

MigrationRunner

Struct MigrationRunner 

Source
pub struct MigrationRunner<M, ES, SS> { /* private fields */ }
Expand description

Drives a StateMigration over every event stream in a store.

Constructed with separate EventStore and SnapshotStore handles so the runner can operate against any backend combination (e.g. SlateDbStore for events and SlateDbSnapshotStore for snapshots, or in-memory stores during testing).

§Concurrency

run() processes streams sequentially. For deployments with thousands of in-flight processes, wrapping the call in a dedicated migration task and using a custom prefix filter via EventStore::list_streams (e.g. filtering by stream-id prefix) can reduce the scan scope.

Implementations§

Source§

impl<M, ES, SS> MigrationRunner<M, ES, SS>

Source

pub fn new(migration: M, event_store: ES, snap_store: SS) -> Self

Construct a new runner.

Source

pub async fn run(&self) -> MigrationReport

Scan all event streams and migrate those that match source_workflow_id.

  • Streams with no events or a different workflow_id are counted in MigrationReport::skipped.
  • Streams that fail (replay error, migrate() returning Err, or snapshot write failure) are recorded in MigrationReport::errors and do not abort the run.

If list_streams itself fails, a single error entry covering "(list_streams)" is returned immediately.

Source

pub async fn run_and_update_registry<R>(&self, registry: &R) -> MigrationReport
where R: ProcessRegistry,

Like run but also updates ProcessRegistry entries after each successful migration.

For every migrated stream the runner:

  1. Parses (tenant_id, process_id) from the stream ID (process/{tenant_id}/{process_id}).
  2. Looks up RegistryKey::from_process(process_id) for that tenant.
  3. Rewrites the stored ProcessIdentity with the new workflow_id and updated stream_id (which embeds the tenant discriminator).

Entries for conversation- or correlation-based routing keys are typically short-lived (they exist only during an active EDIFACT exchange) and do not need updating here.

Registry update failures are recorded as warnings in the returned MigrationReport but do not roll back the snapshot that was already written.

Auto Trait Implementations§

§

impl<M, ES, SS> Freeze for MigrationRunner<M, ES, SS>
where M: Freeze, ES: Freeze, SS: Freeze,

§

impl<M, ES, SS> RefUnwindSafe for MigrationRunner<M, ES, SS>

§

impl<M, ES, SS> Send for MigrationRunner<M, ES, SS>
where M: Send, ES: Send, SS: Send,

§

impl<M, ES, SS> Sync for MigrationRunner<M, ES, SS>
where M: Sync, ES: Sync, SS: Sync,

§

impl<M, ES, SS> Unpin for MigrationRunner<M, ES, SS>
where M: Unpin, ES: Unpin, SS: Unpin,

§

impl<M, ES, SS> UnsafeUnpin for MigrationRunner<M, ES, SS>
where M: UnsafeUnpin, ES: UnsafeUnpin, SS: UnsafeUnpin,

§

impl<M, ES, SS> UnwindSafe for MigrationRunner<M, ES, SS>
where M: UnwindSafe, ES: UnwindSafe, SS: UnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<ST, DT> CastableFrom<ST, Initialized, Initialized> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<ST, DT> CastableFrom<ST, Uninit, Uninit> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Read<Exclusive, BecauseExclusive> for T
where T: ?Sized,

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more