Skip to main content

d_engine_core/state_machine_handler/
mod.rs

1//! The `StateMachineHandler` module provides a core component for managing both
2//! write operations and read requests against the `StateMachine`.
3//!
4//! Snapshot related responbilities:
5//! - Creating/Deleting temporary snapshot files
6//! - Finalizing snapshot file naming and organization
7//! - Version control of snapshots
8//! - File system I/O operations for snapshots
9//! - Handling file locks and concurrency control
10//!
11//! ## Relationship Between `StateMachineHandler` and `StateMachine`
12//! The `StateMachineHandler` serves as the primary interface for interacting
13//! with the `StateMachine`. Its dual responsibilities are:
14//! 1. Applying committed log entries to the `StateMachine` to maintain state consistency
15//! 2. Directly servicing client read requests through state machine queries
16//!
17//! While maintaining separation from the `StateMachine` itself, the handler
18//! leverages the `StateMachine` trait for both state updates and read
19//! operations. This design centralizes all state access points while preserving
20//! separation of concerns.
21//!
22//! ## Design Recommendations
23//! - **Customization Focus**: Developers should prioritize extending the `StateMachine`
24//!   implementation rather than modifying the `StateMachineHandler`. The handler is intentionally
25//!   generic and battle-tested, serving as:
26//!   - Write coordinator for log application
27//!   - Read router for direct state queries
28//! - **State Access Unification**: All state access (both write and read) should flow through the
29//!   handler to leverage:
30//!   - Consistent concurrency control
31//!   - Atomic visibility guarantees
32//!   - Linearizable read optimizations
33
34mod default_state_machine_handler;
35mod snapshot_assembler;
36mod snapshot_guard;
37mod snapshot_policy;
38mod worker;
39
40pub use default_state_machine_handler::*;
41pub(crate) use snapshot_assembler::*;
42pub(crate) use snapshot_guard::*;
43pub use snapshot_policy::*;
44pub use worker::*;
45
46#[cfg(test)]
47mod default_state_machine_handler_test;
48#[cfg(test)]
49mod snapshot_assembler_test;
50#[cfg(test)]
51mod wait_applied_test;
52#[cfg(test)]
53mod worker_test;
54
55use d_engine_proto::client::ClientResult;
56use d_engine_proto::server::storage::SnapshotChunk;
57use d_engine_proto::server::storage::SnapshotMetadata;
58use futures::stream::BoxStream;
59#[cfg(any(test, feature = "__test_support"))]
60use mockall::automock;
61use tonic::async_trait;
62
63use super::NewCommitData;
64use crate::ApplyResult;
65use crate::Result;
66use crate::TypeConfig;
67
68#[cfg_attr(any(test, feature = "__test_support"), automock)]
69#[async_trait]
70pub trait StateMachineHandler<T>: Send + Sync + 'static
71where
72    T: TypeConfig,
73{
74    fn last_applied(&self) -> u64;
75
76    /// Updates the highest known committed log index that hasn't been applied yet
77    fn update_pending(
78        &self,
79        new_commit: u64,
80    );
81
82    /// Waits until the state machine has applied entries up to the target index.
83    /// Returns error if timeout is reached before the target is applied.
84    ///
85    /// This is used to ensure linearizable reads: after leader confirms a log entry
86    /// is committed (via quorum), we must wait for the state machine to apply it
87    /// before reading to guarantee the read reflects all committed writes.
88    async fn wait_applied(
89        &self,
90        target_index: u64,
91        timeout: std::time::Duration,
92    ) -> Result<()>;
93
94    /// Applies a batch of committed log entries to the state machine
95    ///
96    /// Returns execution results for each entry in the same order as input.
97    /// The returned vector length MUST equal the input chunk length.
98    async fn apply_chunk(
99        &self,
100        chunk: Vec<d_engine_proto::common::Entry>,
101    ) -> Result<Vec<ApplyResult>>;
102
103    /// Reads values from the state machine for given keys
104    /// Returns None if any key doesn't exist
105    fn read_from_state_machine(
106        &self,
107        keys: Vec<bytes::Bytes>,
108    ) -> Option<Vec<ClientResult>>;
109
110    /// Receives and installs a snapshot stream pushed by the leader
111    /// Used when leader proactively sends snapshot updates to followers
112    async fn apply_snapshot_stream_from_leader(
113        &self,
114        current_term: u64,
115        stream: Box<tonic::Streaming<SnapshotChunk>>,
116        ack_tx: tokio::sync::mpsc::Sender<d_engine_proto::server::storage::SnapshotAck>,
117        config: &crate::SnapshotConfig,
118    ) -> Result<()>;
119
120    /// Determines if a snapshot should be created based on new commit data
121    fn should_snapshot(
122        &self,
123        new_commit_data: NewCommitData,
124    ) -> bool;
125
126    /// Asynchronously creates a state machine snapshot with the following steps:
127    /// 1. Acquires a write lock to ensure exclusive access during snapshot creation
128    /// 2. Prepares temporary and final snapshot file paths using:
129    ///    - Last applied index/term from state machine
130    /// 3. Generates snapshot data to temporary file using state machine implementation
131    /// 4. Atomically renames temporary file to final snapshot file to ensure consistency
132    /// 5. Cleans up old snapshots based on last_included_index, retaining only the latest snapshot
133    ///    files as specified by cleanup_retain_count.
134    ///
135    /// Returns new Snapshot metadata and final snapshot path to indicate the new snapshot file has
136    /// been successfully created
137    async fn create_snapshot(&self) -> Result<(SnapshotMetadata, std::path::PathBuf)>;
138
139    /// Cleans up old snapshots before specified version
140    async fn cleanup_snapshot(
141        &self,
142        before_version: u64,
143        snapshot_dir: &std::path::Path,
144        snapshot_dir_prefix: &str,
145    ) -> crate::Result<()>;
146
147    /// Retrieves metadata of the latest valid snapshot
148    fn get_latest_snapshot_metadata(&self) -> Option<SnapshotMetadata>;
149
150    /// Loads snapshot data as a stream of chunks
151    async fn load_snapshot_data(
152        &self,
153        metadata: SnapshotMetadata,
154    ) -> Result<BoxStream<'static, Result<SnapshotChunk>>>;
155
156    /// Loads a specific chunk of snapshot data by sequence number
157    #[allow(unused)]
158    async fn load_snapshot_chunk(
159        &self,
160        metadata: &SnapshotMetadata,
161        seq: u32,
162    ) -> Result<SnapshotChunk>;
163
164    fn pending_range(&self) -> Option<std::ops::RangeInclusive<u64>>;
165}