openraft/storage/mod.rs
1//! The Raft storage interface and data types.
2
3#[cfg(not(feature = "storage-v2"))]
4pub(crate) mod adapter;
5mod callback;
6mod helper;
7mod log_store_ext;
8mod snapshot_signature;
9mod v2;
10
11use std::fmt;
12use std::fmt::Debug;
13use std::ops::RangeBounds;
14
15#[cfg(not(feature = "storage-v2"))]
16pub use adapter::Adaptor;
17pub use helper::StorageHelper;
18pub use log_store_ext::RaftLogReaderExt;
19use openraft_macros::add_async_trait;
20pub use snapshot_signature::SnapshotSignature;
21pub use v2::RaftLogStorage;
22pub use v2::RaftLogStorageExt;
23pub use v2::RaftStateMachine;
24
25use crate::display_ext::DisplayOption;
26use crate::node::Node;
27use crate::raft_types::SnapshotId;
28pub use crate::storage::callback::LogApplied;
29pub use crate::storage::callback::LogFlushed;
30use crate::LogId;
31use crate::MessageSummary;
32use crate::NodeId;
33use crate::OptionalSend;
34use crate::OptionalSync;
35use crate::RaftTypeConfig;
36use crate::StorageError;
37use crate::StoredMembership;
38use crate::Vote;
39
40/// The metadata of a snapshot.
41///
42/// Including the last log id that included in this snapshot,
43/// the last membership included,
44/// and a snapshot id.
45#[derive(Debug, Clone, Default, PartialEq, Eq)]
46#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
47pub struct SnapshotMeta<NID, N>
48where
49 NID: NodeId,
50 N: Node,
51{
52 /// Log entries upto which this snapshot includes, inclusive.
53 pub last_log_id: Option<LogId<NID>>,
54
55 /// The last applied membership config.
56 pub last_membership: StoredMembership<NID, N>,
57
58 /// To identify a snapshot when transferring.
59 /// Caveat: even when two snapshot is built with the same `last_log_id`, they still could be
60 /// different in bytes.
61 pub snapshot_id: SnapshotId,
62}
63
64impl<NID, N> fmt::Display for SnapshotMeta<NID, N>
65where
66 NID: NodeId,
67 N: Node,
68{
69 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70 write!(
71 f,
72 "{{snapshot_id: {}, last_log:{}, last_membership: {}}}",
73 self.snapshot_id,
74 DisplayOption(&self.last_log_id),
75 self.last_membership
76 )
77 }
78}
79
80impl<NID, N> MessageSummary<SnapshotMeta<NID, N>> for SnapshotMeta<NID, N>
81where
82 NID: NodeId,
83 N: Node,
84{
85 fn summary(&self) -> String {
86 self.to_string()
87 }
88}
89
90impl<NID, N> SnapshotMeta<NID, N>
91where
92 NID: NodeId,
93 N: Node,
94{
95 pub fn signature(&self) -> SnapshotSignature<NID> {
96 SnapshotSignature {
97 last_log_id: self.last_log_id.clone(),
98 last_membership_log_id: self.last_membership.log_id().clone(),
99 snapshot_id: self.snapshot_id.clone(),
100 }
101 }
102
103 /// Returns a ref to the id of the last log that is included in this snasphot.
104 pub fn last_log_id(&self) -> Option<&LogId<NID>> {
105 self.last_log_id.as_ref()
106 }
107}
108
109/// The data associated with the current snapshot.
110#[derive(Debug, Clone)]
111pub struct Snapshot<C>
112where C: RaftTypeConfig
113{
114 /// metadata of a snapshot
115 pub meta: SnapshotMeta<C::NodeId, C::Node>,
116
117 /// A read handle to the associated snapshot.
118 pub snapshot: Box<C::SnapshotData>,
119}
120
121impl<C> Snapshot<C>
122where C: RaftTypeConfig
123{
124 #[allow(dead_code)]
125 pub(crate) fn new(meta: SnapshotMeta<C::NodeId, C::Node>, snapshot: Box<C::SnapshotData>) -> Self {
126 Self { meta, snapshot }
127 }
128}
129
130impl<C> fmt::Display for Snapshot<C>
131where C: RaftTypeConfig
132{
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 write!(f, "Snapshot{{meta: {}}}", self.meta)
135 }
136}
137
138/// The state about logs.
139///
140/// Invariance: last_purged_log_id <= last_applied <= last_log_id
141#[derive(Clone, Debug, Default, PartialEq, Eq)]
142pub struct LogState<C: RaftTypeConfig> {
143 /// The greatest log id that has been purged after being applied to state machine.
144 pub last_purged_log_id: Option<LogId<C::NodeId>>,
145
146 /// The log id of the last present entry if there are any entries.
147 /// Otherwise the same value as `last_purged_log_id`.
148 pub last_log_id: Option<LogId<C::NodeId>>,
149}
150
151/// A trait defining the interface for a Raft log subsystem.
152///
153/// This interface is accessed read-only from replica streams.
154///
155/// Typically, the log reader implementation as such will be hidden behind an `Arc<T>` and
156/// this interface implemented on the `Arc<T>`. It can be co-implemented with [`RaftStorage`]
157/// interface on the same cloneable object, if the underlying state machine is anyway synchronized.
158#[add_async_trait]
159pub trait RaftLogReader<C>: OptionalSend + OptionalSync + 'static
160where C: RaftTypeConfig
161{
162 /// Get a series of log entries from storage.
163 ///
164 /// The start value is inclusive in the search and the stop value is non-inclusive: `[start,
165 /// stop)`.
166 ///
167 /// Entry that is not found is allowed.
168 async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend>(
169 &mut self,
170 range: RB,
171 ) -> Result<Vec<C::Entry>, StorageError<C::NodeId>>;
172
173 /// Returns log entries within range `[start, end)`, `end` is exclusive,
174 /// potentially limited by implementation-defined constraints.
175 ///
176 /// If the specified range is too large, the implementation may return only the first few log
177 /// entries to ensure the result is not excessively large.
178 ///
179 /// It must not return empty result if the input range is not empty.
180 ///
181 /// The default implementation just returns the full range of log entries.
182 async fn limited_get_log_entries(
183 &mut self,
184 start: u64,
185 end: u64,
186 ) -> Result<Vec<C::Entry>, StorageError<C::NodeId>> {
187 self.try_get_log_entries(start..end).await
188 }
189}
190
191/// A trait defining the interface for a Raft state machine snapshot subsystem.
192///
193/// This interface is accessed read-only from snapshot building task.
194///
195/// Typically, the snapshot implementation as such will be hidden behind a reference type like
196/// `Arc<T>` or `Box<T>` and this interface implemented on the reference type. It can be
197/// co-implemented with [`RaftStorage`] interface on the same cloneable object, if the underlying
198/// state machine is anyway synchronized.
199#[add_async_trait]
200pub trait RaftSnapshotBuilder<C>: OptionalSend + OptionalSync + 'static
201where C: RaftTypeConfig
202{
203 /// Build snapshot
204 ///
205 /// A snapshot has to contain state of all applied log, including membership. Usually it is just
206 /// a serialized state machine.
207 ///
208 /// Building snapshot can be done by:
209 /// - Performing log compaction, e.g. merge log entries that operates on the same key, like a
210 /// LSM-tree does,
211 /// - or by fetching a snapshot from the state machine.
212 async fn build_snapshot(&mut self) -> Result<Snapshot<C>, StorageError<C::NodeId>>;
213
214 // NOTES:
215 // This interface is geared toward small file-based snapshots. However, not all snapshots can
216 // be easily represented as a file. Probably a more generic interface will be needed to address
217 // also other needs.
218}
219
220/// A trait defining the interface for a Raft storage system.
221///
222/// Typically, the storage implementation as such will be hidden behind a `Box<T>`, `Arc<T>` or
223/// a similar, more advanced reference type and this interface implemented on that reference type.
224///
225/// All methods on the storage are called inside of Raft core task. There is no concurrency on the
226/// storage, except concurrency with snapshot builder and log reader, both created by this API.
227/// The implementation of the API has to cope with (infrequent) concurrent access from these two
228/// components.
229#[cfg_attr(
230 feature = "storage-v2",
231 deprecated(since = "0.8.4", note = "use `RaftLogStorage` and `RaftStateMachine` instead")
232)]
233#[add_async_trait]
234pub trait RaftStorage<C>: RaftLogReader<C> + OptionalSend + OptionalSync + 'static
235where C: RaftTypeConfig
236{
237 /// Log reader type.
238 type LogReader: RaftLogReader<C>;
239
240 /// Snapshot builder type.
241 type SnapshotBuilder: RaftSnapshotBuilder<C>;
242
243 // --- Vote
244
245 /// To ensure correctness: the vote must be persisted on disk before returning.
246 async fn save_vote(&mut self, vote: &Vote<C::NodeId>) -> Result<(), StorageError<C::NodeId>>;
247
248 async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C::NodeId>>;
249
250 /// Saves the last committed log id to storage.
251 ///
252 /// # Optional feature
253 ///
254 /// If the state machine flushes state to disk before returning from `apply()`,
255 /// then the application does not need to implement this method.
256 ///
257 /// Otherwise, i.e., the state machine just relies on periodical snapshot to persist state to
258 /// disk:
259 ///
260 /// - If the `committed` log id is saved, the state machine will be recovered to the state
261 /// corresponding to this `committed` log id upon system startup, i.e., the state at the point
262 /// when the committed log id was applied.
263 ///
264 /// - If the `committed` log id is not saved, Openraft will just recover the state machine to
265 /// the state of the last snapshot taken.
266 async fn save_committed(&mut self, _committed: Option<LogId<C::NodeId>>) -> Result<(), StorageError<C::NodeId>> {
267 // By default `committed` log id is not saved
268 Ok(())
269 }
270
271 /// Return the last saved committed log id by [`Self::save_committed`].
272 async fn read_committed(&mut self) -> Result<Option<LogId<C::NodeId>>, StorageError<C::NodeId>> {
273 // By default `committed` log id is not saved and this method just return None.
274 Ok(None)
275 }
276
277 // --- Log
278
279 /// Returns the last deleted log id and the last log id.
280 ///
281 /// The impl should **not** consider the applied log id in state machine.
282 /// The returned `last_log_id` could be the log id of the last present log entry, or the
283 /// `last_purged_log_id` if there is no entry at all.
284 // NOTE: This can be made into sync, provided all state machines will use atomic read or the
285 // like.
286 async fn get_log_state(&mut self) -> Result<LogState<C>, StorageError<C::NodeId>>;
287
288 /// Get the log reader.
289 ///
290 /// The method is intentionally async to give the implementation a chance to use asynchronous
291 /// sync primitives to serialize access to the common internal object, if needed.
292 async fn get_log_reader(&mut self) -> Self::LogReader;
293
294 /// Append a payload of entries to the log.
295 ///
296 /// Though the entries will always be presented in order, each entry's index should be used to
297 /// determine its location to be written in the log.
298 ///
299 /// To ensure correctness:
300 ///
301 /// - All entries must be persisted on disk before returning.
302 ///
303 /// - There must not be a **hole** in logs. Because Raft only examine the last log id to ensure
304 /// correctness.
305 async fn append_to_log<I>(&mut self, entries: I) -> Result<(), StorageError<C::NodeId>>
306 where I: IntoIterator<Item = C::Entry> + OptionalSend;
307
308 /// Delete conflict log entries since `log_id`, inclusive.
309 ///
310 /// This method is called by a follower or learner when the local logs conflict with the
311 /// leaders.
312 ///
313 /// To ensure correctness:
314 ///
315 /// - When this function returns, the deleted logs must not be read(e.g., by
316 /// `RaftLogReader::try_get_log_entries()`) any more.
317 ///
318 /// - It must not leave a **hole** in the log. In other words, if it has to delete logs in more
319 /// than one transactions, it must delete logs in backward order. So that in a case server
320 /// crashes, it won't leave a hole.
321 async fn delete_conflict_logs_since(&mut self, log_id: LogId<C::NodeId>) -> Result<(), StorageError<C::NodeId>>;
322
323 /// Delete applied log entries upto `log_id`, inclusive.
324 ///
325 /// To ensure correctness:
326 ///
327 /// - It must not leave a **hole** in logs.
328 async fn purge_logs_upto(&mut self, log_id: LogId<C::NodeId>) -> Result<(), StorageError<C::NodeId>>;
329
330 // --- State Machine
331
332 // TODO: This can be made into sync, provided all state machines will use atomic read or the
333 // like.
334 // ---
335 /// Returns the last applied log id which is recorded in state machine, and the last applied
336 /// membership config.
337 ///
338 /// ## Correctness requirements
339 ///
340 /// It is all right to return a membership with greater log id than the
341 /// last-applied-log-id.
342 async fn last_applied_state(
343 &mut self,
344 ) -> Result<(Option<LogId<C::NodeId>>, StoredMembership<C::NodeId, C::Node>), StorageError<C::NodeId>>;
345
346 /// Apply the given payload of entries to the state machine.
347 ///
348 /// The Raft protocol guarantees that only logs which have been _committed_, that is, logs which
349 /// have been replicated to a quorum of the cluster, will be applied to the state machine.
350 ///
351 /// This is where the business logic of interacting with your application's state machine
352 /// should live. This is 100% application specific. Perhaps this is where an application
353 /// specific transaction is being started, or perhaps committed. This may be where a key/value
354 /// is being stored.
355 ///
356 /// For every entry to apply, an implementation should:
357 /// - Store the log id as last applied log id.
358 /// - Deal with the EntryPayload::Normal() log, which is business logic log.
359 /// - Store membership config in EntryPayload::Membership.
360 ///
361 /// Note that for a membership log, the implementation need to do nothing about it, except
362 /// storing it.
363 ///
364 /// An implementation may choose to persist either the state machine or the snapshot:
365 ///
366 /// - An implementation with persistent state machine: persists the state on disk before
367 /// returning from `apply_to_state_machine()`. So that a snapshot does not need to be
368 /// persistent.
369 ///
370 /// - An implementation with persistent snapshot: `apply_to_state_machine()` does not have to
371 /// persist state on disk. But every snapshot has to be persistent. And when starting up the
372 /// application, the state machine should be rebuilt from the last snapshot.
373 async fn apply_to_state_machine(&mut self, entries: &[C::Entry]) -> Result<Vec<C::R>, StorageError<C::NodeId>>;
374
375 // --- Snapshot
376
377 /// Get the snapshot builder for the state machine.
378 ///
379 /// The method is intentionally async to give the implementation a chance to use asynchronous
380 /// sync primitives to serialize access to the common internal object, if needed.
381 async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder;
382
383 /// Create a new blank snapshot, returning a writable handle to the snapshot object.
384 ///
385 /// Raft will use this handle to receive snapshot data.
386 ///
387 /// See the [storage chapter of the guide][sto] for details on log compaction / snapshotting.
388 ///
389 /// [sto]: crate::docs::getting_started#3-implement-raftlogstorage-and-raftstatemachine
390 async fn begin_receiving_snapshot(&mut self) -> Result<Box<C::SnapshotData>, StorageError<C::NodeId>>;
391
392 /// Install a snapshot which has finished streaming from the leader.
393 ///
394 /// All other snapshots should be deleted at this point.
395 ///
396 /// ### snapshot
397 /// A snapshot created from an earlier call to `begin_receiving_snapshot` which provided the
398 /// snapshot.
399 async fn install_snapshot(
400 &mut self,
401 meta: &SnapshotMeta<C::NodeId, C::Node>,
402 snapshot: Box<C::SnapshotData>,
403 ) -> Result<(), StorageError<C::NodeId>>;
404
405 /// Get a readable handle to the current snapshot, along with its metadata.
406 ///
407 /// ### implementation algorithm
408 /// Implementing this method should be straightforward. Check the configured snapshot
409 /// directory for any snapshot files. A proper implementation will only ever have one
410 /// active snapshot, though another may exist while it is being created. As such, it is
411 /// recommended to use a file naming pattern which will allow for easily distinguishing between
412 /// the current live snapshot, and any new snapshot which is being created.
413 ///
414 /// A proper snapshot implementation will store the term, index and membership config as part
415 /// of the snapshot, which should be decoded for creating this method's response data.
416 async fn get_current_snapshot(&mut self) -> Result<Option<Snapshot<C>>, StorageError<C::NodeId>>;
417}