High-Performance, Batteries-Included, Event Sourcing for Rust
This crate supports a style of transaction processing known as "event sourcing." That name is rather opaque, but the basic idea is quite simple: instead of storing mutable records that get updated as state changes, event-sourcing systems store a series of immutable events describing those state changes. When the system needs to know the state of a given entity, it selects the events related to that entity and reduces (aka folds) them into an "aggregate," which is the current state of that entity. In other words, the current state of a transaction is actually calculated from the events recorded about that transaction.
Aggregates can be cached aggressively because the events they were calculated from are immutable, and new events are always append to the end of the log. A cached aggregate can also be quickly updated by selecting and applying only the events that were recorded after it was created.
This approach provides not only a full audit trail for how a given entity's state ended up the way it did, but also an easy way to record events that can happen multiple times to the same entity. For example, a payment may be partially captured or refunded several times, but each of those events will have their own distinct properties for the monetary amounts and reference numbers.
It's also possible for different parts of your system to calculate different kinds of aggregates from the same set of underlying events. For example, the aggregate for the main payment service may contain all the relevant payment details, but the aggregate for a disputes service might only need the details about the particular capture/clearing that is being disputed. This is similar to how relational databases can support multiple materialized views over the same underlying set of tables.
The drawback of this approach is that is makes listing and querying of entities more complex: if you store individual events and calculate the overall state, how do you quickly find all payments that have been fully refunded? Most event-sourcing systems handle this by sending these sorts of queries to a separate, highly-indexed database containing aggregate snapshots. These aggregates are typically recalculated and updated asynchronously in response to new events written to the transaction-processing database. They are eventually-consistent, but listing/querying APIs are often that way in large distributed systems. This also keeps the writes to the transaction-processing database very fast since those are just inserts to a minimally-indexed table. This division of labor is typically called "Command and Query Responsibility Separation" or CQRS for short.
⚠️ Caution: The crate is functional and tested, but hasn't been used in production yet, so use at your own risk! If you'd like to do a pilot, create a tracking issue on GitHub and I'll gladly help you.
Built-In Features
This is a batteries-included library that offers features one typically needs in a high-throughput distributed system:
- Idempotency: When creating a new log or appending an event to an existing one, the caller can include a unique
idempotency_keythat ensures the operation occurs only once, even if the request is retried. Idempotent replays will return anIdempotentReplayerror with the previously-recordedLogIdand event index, so that you can easily detect and react to them appropriately. - Optimistic Concurrency: If multiple service instances attempt to append a new event to the same log at the same time, only one will win the race, and the others will receive an error. The losers can then re-reduce the log to see the effect of the new event on the aggregate, determine if their operation is still relevant, and try again.
- Async Aggregate Caching: When you reduce a log, the resulting aggregate is written asynchronously to a cache like Redis. Subsequent calls to
reduce()will reuse that cached aggregate, and only fetch/apply events that were recorded after the aggregate was last calculated. This makes subsequent reductions faster without slowing down your code. - Caching Policies: Aggregates are always cached by default, but if you want to control when this occurs based on aggregate properties, you can provide an implementation of
CachingPolicy. For example, if the state of the aggregates tells you that it will never be loaded again, you can skip caching it. - Event Streaming and Paging: When reducing, events are asynchronously streamed from the database instead of buffered to limit the amount of memory consumed. But the library also offers a convenience method you can use to get a page of events at a time as a
Vector, which makes it easier to return them as a JSON array from your service's API.
Example Usage
use Error;
use ;
use FakeEventStore;
use FakeReductionCache;
use ;
// A typical application of event-sourcing is the tracking of payments.
// A payment is really a series of events: authorization, increment,
// reversal, capture, clearing, refund, dispute, etc. Most events
// can occur several times, but each must capture distinct properties
// (e.g., the amount refunded). The overall state of the payment can
// then be reduced from these events.
// Let's start by defining a struct to hold the initial payment request
// properties, which would include details about the card, cardholder,
// amount requested, etc.
// To keep things simple, amounts will be tracked in minor units with an
// assumed single currency.
// Now let's define our events, which are typically variants in an enum.
// Since this is just an example, we'll define only a subset with only
// the most relevant properties. Timestamps are added automatically
// by this crate, so we don't need to define them in each event.
// Now let's define the "aggregate" for these events, which is the overall
// state of the payment. This is what we will reduce from the events,
// and use to decide if the current API request or operation is allowable.
// Aggregates must implement/derive Default, and implement Aggregate.
// To make Payment an aggregate, implement the Aggregate trait, which
// adds a method for applying each event to the aggregate's current state.
async
Cargo Features
This crate defines the following Cargo/compiler features:
| Name | Description | Default? |
|---|---|---|
| postgres-store | Enables the PostgresEventStore | Yes |
| redis-cache | Enables the RedisReductionCache | Yes |
Since Postgres and Redis are very common choices, these features
are on by default. As more EventStore and ReductionCache
implementations are added in the future, corresponding non-default
features will be defined.