pub trait RaftStorage<C>: RaftLogReader<C> + Send + Sync + 'staticwhere
    C: RaftTypeConfig,{
    type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static;
    type LogReader: RaftLogReader<C>;
    type SnapshotBuilder: RaftSnapshotBuilder<C, Self::SnapshotData>;

    // Required methods
    fn save_vote<'life0, 'life1, 'async_trait>(
        &'life0 mut self,
        vote: &'life1 Vote<C::NodeId>
    ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<C::NodeId>>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait;
    fn read_vote<'life0, 'async_trait>(
        &'life0 mut self
    ) -> Pin<Box<dyn Future<Output = Result<Option<Vote<C::NodeId>>, StorageError<C::NodeId>>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn get_log_reader<'life0, 'async_trait>(
        &'life0 mut self
    ) -> Pin<Box<dyn Future<Output = Self::LogReader> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn append_to_log<'life0, 'life1, 'life2, 'async_trait>(
        &'life0 mut self,
        entries: &'life1 [&'life2 Entry<C>]
    ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<C::NodeId>>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait,
             'life2: 'async_trait;
    fn delete_conflict_logs_since<'life0, 'async_trait>(
        &'life0 mut self,
        log_id: LogId<C::NodeId>
    ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<C::NodeId>>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn purge_logs_upto<'life0, 'async_trait>(
        &'life0 mut self,
        log_id: LogId<C::NodeId>
    ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<C::NodeId>>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn last_applied_state<'life0, 'async_trait>(
        &'life0 mut self
    ) -> Pin<Box<dyn Future<Output = Result<(Option<LogId<C::NodeId>>, StoredMembership<C::NodeId, C::Node>), StorageError<C::NodeId>>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn apply_to_state_machine<'life0, 'life1, 'life2, 'async_trait>(
        &'life0 mut self,
        entries: &'life1 [&'life2 Entry<C>]
    ) -> Pin<Box<dyn Future<Output = Result<Vec<C::R>, StorageError<C::NodeId>>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait,
             'life2: 'async_trait;
    fn get_snapshot_builder<'life0, 'async_trait>(
        &'life0 mut self
    ) -> Pin<Box<dyn Future<Output = Self::SnapshotBuilder> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn begin_receiving_snapshot<'life0, 'async_trait>(
        &'life0 mut self
    ) -> Pin<Box<dyn Future<Output = Result<Box<Self::SnapshotData>, StorageError<C::NodeId>>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn install_snapshot<'life0, 'life1, 'async_trait>(
        &'life0 mut self,
        meta: &'life1 SnapshotMeta<C::NodeId, C::Node>,
        snapshot: Box<Self::SnapshotData>
    ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<C::NodeId>>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait;
    fn get_current_snapshot<'life0, 'async_trait>(
        &'life0 mut self
    ) -> Pin<Box<dyn Future<Output = Result<Option<Snapshot<C::NodeId, C::Node, Self::SnapshotData>>, StorageError<C::NodeId>>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
}
Expand description

A trait defining the interface for a Raft storage system.

See the storage chapter of the guide for details and discussion on this trait and how to implement it.

Typically, the storage implementation as such will be hidden behind a Box<T>, Arc<T> or a similar, more advanced reference type and this interface implemented on that reference type.

All methods on the storage are called inside of Raft core task. There is no concurrency on the storage, except concurrency with snapshot builder and log reader, both created by this API. The implementation of the API has to cope with (infrequent) concurrent access from these two components.

Required Associated Types§

source

type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static

The storage engine’s associated type used for exposing a snapshot for reading & writing.

See the storage chapter of the guide for details on where and how this is used.

source

type LogReader: RaftLogReader<C>

Log reader type.

source

type SnapshotBuilder: RaftSnapshotBuilder<C, Self::SnapshotData>

Snapshot builder type.

Required Methods§

source

fn save_vote<'life0, 'life1, 'async_trait>( &'life0 mut self, vote: &'life1 Vote<C::NodeId> ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<C::NodeId>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

To ensure correctness: the vote must be persisted on disk before returning.

source

fn read_vote<'life0, 'async_trait>( &'life0 mut self ) -> Pin<Box<dyn Future<Output = Result<Option<Vote<C::NodeId>>, StorageError<C::NodeId>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,

source

fn get_log_reader<'life0, 'async_trait>( &'life0 mut self ) -> Pin<Box<dyn Future<Output = Self::LogReader> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,

Get the log reader.

The method is intentionally async to give the implementation a chance to use asynchronous sync primitives to serialize access to the common internal object, if needed.

source

fn append_to_log<'life0, 'life1, 'life2, 'async_trait>( &'life0 mut self, entries: &'life1 [&'life2 Entry<C>] ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<C::NodeId>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Append a payload of entries to the log.

Though the entries will always be presented in order, each entry’s index should be used to determine its location to be written in the log.

To ensure correctness:

  • All entries must be persisted on disk before returning.

  • There must not be a hole in logs. Because Raft only examine the last log id to ensure correctness.

source

fn delete_conflict_logs_since<'life0, 'async_trait>( &'life0 mut self, log_id: LogId<C::NodeId> ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<C::NodeId>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,

Delete conflict log entries since log_id, inclusive.

This method is called by a follower or learner when the local logs conflict with the leaders.

To ensure correctness:

  • When this function returns, the deleted logs must not be read(e.g., by RaftLogReader::try_get_log_entries()) any more.

  • It must not leave a hole in the log. In other words, if it has to delete logs in more than one transactions, it must delete logs in backward order. So that in a case server crashes, it won’t leave a hole.

source

fn purge_logs_upto<'life0, 'async_trait>( &'life0 mut self, log_id: LogId<C::NodeId> ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<C::NodeId>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,

Delete applied log entries upto log_id, inclusive.

To ensure correctness:

  • It must not leave a hole in logs.
source

fn last_applied_state<'life0, 'async_trait>( &'life0 mut self ) -> Pin<Box<dyn Future<Output = Result<(Option<LogId<C::NodeId>>, StoredMembership<C::NodeId, C::Node>), StorageError<C::NodeId>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,

Returns the last applied log id which is recorded in state machine, and the last applied membership config.

Correctness requirements

It is all right to return a membership with greater log id than the last-applied-log-id.

source

fn apply_to_state_machine<'life0, 'life1, 'life2, 'async_trait>( &'life0 mut self, entries: &'life1 [&'life2 Entry<C>] ) -> Pin<Box<dyn Future<Output = Result<Vec<C::R>, StorageError<C::NodeId>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Apply the given payload of entries to the state machine.

The Raft protocol guarantees that only logs which have been committed, that is, logs which have been replicated to a quorum of the cluster, will be applied to the state machine.

This is where the business logic of interacting with your application’s state machine should live. This is 100% application specific. Perhaps this is where an application specific transaction is being started, or perhaps committed. This may be where a key/value is being stored.

For every entry to apply, an implementation should:

  • Store the log id as last applied log id.
  • Deal with the EntryPayload::Normal() log, which is business logic log.
  • Store membership config in EntryPayload::Membership.

Note that for a membership log, the implementation need to do nothing about it, except storing it.

An implementation may choose to persist either the state machine or the snapshot:

  • An implementation with persistent state machine: persists the state on disk before returning from apply_to_state_machine(). So that a snapshot does not need to be persistent.

  • An implementation with persistent snapshot: apply_to_state_machine() does not have to persist state on disk. But every snapshot has to be persistent. And when starting up the application, the state machine should be rebuilt from the last snapshot.

source

fn get_snapshot_builder<'life0, 'async_trait>( &'life0 mut self ) -> Pin<Box<dyn Future<Output = Self::SnapshotBuilder> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,

Get the snapshot builder for the state machine.

The method is intentionally async to give the implementation a chance to use asynchronous sync primitives to serialize access to the common internal object, if needed.

source

fn begin_receiving_snapshot<'life0, 'async_trait>( &'life0 mut self ) -> Pin<Box<dyn Future<Output = Result<Box<Self::SnapshotData>, StorageError<C::NodeId>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,

Create a new blank snapshot, returning a writable handle to the snapshot object.

Raft will use this handle to receive snapshot data.

implementation guide

See the storage chapter of the guide for details on log compaction / snapshotting.

source

fn install_snapshot<'life0, 'life1, 'async_trait>( &'life0 mut self, meta: &'life1 SnapshotMeta<C::NodeId, C::Node>, snapshot: Box<Self::SnapshotData> ) -> Pin<Box<dyn Future<Output = Result<(), StorageError<C::NodeId>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Install a snapshot which has finished streaming from the leader.

All other snapshots should be deleted at this point.

snapshot

A snapshot created from an earlier call to begin_receiving_snapshot which provided the snapshot.

source

fn get_current_snapshot<'life0, 'async_trait>( &'life0 mut self ) -> Pin<Box<dyn Future<Output = Result<Option<Snapshot<C::NodeId, C::Node, Self::SnapshotData>>, StorageError<C::NodeId>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,

Get a readable handle to the current snapshot, along with its metadata.

implementation algorithm

Implementing this method should be straightforward. Check the configured snapshot directory for any snapshot files. A proper implementation will only ever have one active snapshot, though another may exist while it is being created. As such, it is recommended to use a file naming pattern which will allow for easily distinguishing between the current live snapshot, and any new snapshot which is being created.

A proper snapshot implementation will store the term, index and membership config as part of the snapshot, which should be decoded for creating this method’s response data.

Implementors§

source§

impl<C, T> RaftStorage<C> for StoreExt<C, T>where T: RaftStorage<C>, C: RaftTypeConfig,

§

type SnapshotData = <T as RaftStorage<C>>::SnapshotData

§

type LogReader = LogReaderExt<C, T>

§

type SnapshotBuilder = SnapshotBuilderExt<C, T>