d_engine_core/state_machine_handler/
mod.rs

1//! The `StateMachineHandler` module provides a core component for managing both
2//! write operations and read requests against the `StateMachine`.
3//!
4//! Snapshot related responbilities:
5//! - Creating/Deleting temporary snapshot files
6//! - Finalizing snapshot file naming and organization
7//! - Version control of snapshots
8//! - File system I/O operations for snapshots
9//! - Handling file locks and concurrency control
10//!
11//! ## Relationship Between `StateMachineHandler` and `StateMachine`
12//! The `StateMachineHandler` serves as the primary interface for interacting
13//! with the `StateMachine`. Its dual responsibilities are:
14//! 1. Applying committed log entries to the `StateMachine` to maintain state consistency
15//! 2. Directly servicing client read requests through state machine queries
16//!
17//! While maintaining separation from the `StateMachine` itself, the handler
18//! leverages the `StateMachine` trait for both state updates and read
19//! operations. This design centralizes all state access points while preserving
20//! separation of concerns.
21//!
22//! ## Design Recommendations
23//! - **Customization Focus**: Developers should prioritize extending the `StateMachine`
24//!   implementation rather than modifying the `StateMachineHandler`. The handler is intentionally
25//!   generic and battle-tested, serving as:
26//!   - Write coordinator for log application
27//!   - Read router for direct state queries
28//! - **State Access Unification**: All state access (both write and read) should flow through the
29//!   handler to leverage:
30//!   - Consistent concurrency control
31//!   - Atomic visibility guarantees
32//!   - Linearizable read optimizations
33
34mod default_state_machine_handler;
35mod snapshot_assembler;
36mod snapshot_guard;
37mod snapshot_policy;
38
39pub use default_state_machine_handler::*;
40pub(crate) use snapshot_assembler::*;
41pub(crate) use snapshot_guard::*;
42pub use snapshot_policy::*;
43
44#[cfg(test)]
45mod default_state_machine_handler_test;
46#[cfg(test)]
47mod snapshot_assembler_test;
48#[cfg(test)]
49mod wait_applied_test;
50
51use std::sync::Arc;
52
53use d_engine_proto::client::ClientResult;
54use d_engine_proto::common::LogId;
55use d_engine_proto::server::storage::PurgeLogRequest;
56use d_engine_proto::server::storage::PurgeLogResponse;
57use d_engine_proto::server::storage::SnapshotChunk;
58use d_engine_proto::server::storage::SnapshotMetadata;
59use futures::stream::BoxStream;
60#[cfg(any(test, feature = "__test_support"))]
61use mockall::automock;
62use tonic::async_trait;
63
64use super::NewCommitData;
65use crate::Result;
66use crate::TypeConfig;
67use crate::alias::ROF;
68
69#[cfg_attr(any(test, feature = "__test_support"), automock)]
70#[async_trait]
71pub trait StateMachineHandler<T>: Send + Sync + 'static
72where
73    T: TypeConfig,
74{
75    fn last_applied(&self) -> u64;
76
77    /// Updates the highest known committed log index that hasn't been applied yet
78    fn update_pending(
79        &self,
80        new_commit: u64,
81    );
82
83    /// Waits until the state machine has applied entries up to the target index.
84    /// Returns error if timeout is reached before the target is applied.
85    ///
86    /// This is used to ensure linearizable reads: after leader confirms a log entry
87    /// is committed (via quorum), we must wait for the state machine to apply it
88    /// before reading to guarantee the read reflects all committed writes.
89    async fn wait_applied(
90        &self,
91        target_index: u64,
92        timeout: std::time::Duration,
93    ) -> Result<()>;
94
95    /// Applies a batch of committed log entries to the state machine
96    async fn apply_chunk(
97        &self,
98        chunk: Vec<d_engine_proto::common::Entry>,
99    ) -> Result<()>;
100
101    /// Reads values from the state machine for given keys
102    /// Returns None if any key doesn't exist
103    fn read_from_state_machine(
104        &self,
105        keys: Vec<bytes::Bytes>,
106    ) -> Option<Vec<ClientResult>>;
107
108    /// Receives and installs a snapshot stream pushed by the leader
109    /// Used when leader proactively sends snapshot updates to followers
110    async fn apply_snapshot_stream_from_leader(
111        &self,
112        current_term: u64,
113        stream: Box<tonic::Streaming<SnapshotChunk>>,
114        ack_tx: tokio::sync::mpsc::Sender<d_engine_proto::server::storage::SnapshotAck>,
115        config: &crate::SnapshotConfig,
116    ) -> Result<()>;
117
118    /// Determines if a snapshot should be created based on new commit data
119    fn should_snapshot(
120        &self,
121        new_commit_data: NewCommitData,
122    ) -> bool;
123
124    /// Asynchronously creates a state machine snapshot with the following steps:
125    /// 1. Acquires a write lock to ensure exclusive access during snapshot creation
126    /// 2. Prepares temporary and final snapshot file paths using:
127    ///    - Last applied index/term from state machine
128    /// 3. Generates snapshot data to temporary file using state machine implementation
129    /// 4. Atomically renames temporary file to final snapshot file to ensure consistency
130    /// 5. Cleans up old snapshots based on last_included_index, retaining only the latest snapshot
131    ///    files as specified by cleanup_retain_count.
132    ///
133    /// Returns new Snapshot metadata and final snapshot path to indicate the new snapshot file has
134    /// been successfully created
135    async fn create_snapshot(&self) -> Result<(SnapshotMetadata, std::path::PathBuf)>;
136
137    /// Cleans up old snapshots before specified version
138    async fn cleanup_snapshot(
139        &self,
140        before_version: u64,
141        snapshot_dir: &std::path::Path,
142        snapshot_dir_prefix: &str,
143    ) -> crate::Result<()>;
144
145    /// Validates if a log purge request from leader is authorized
146    async fn validate_purge_request(
147        &self,
148        current_term: u64,
149        leader_id: Option<u32>,
150        req: &PurgeLogRequest,
151    ) -> Result<bool>;
152
153    /// Processes log purge requests (for non-leader nodes)
154    #[allow(unused)]
155    async fn handle_purge_request(
156        &self,
157        current_term: u64,
158        leader_id: Option<u32>,
159        last_purged: Option<LogId>,
160        req: &PurgeLogRequest,
161        raft_log: &Arc<ROF<T>>,
162    ) -> Result<PurgeLogResponse>;
163
164    /// Retrieves metadata of the latest valid snapshot
165    fn get_latest_snapshot_metadata(&self) -> Option<SnapshotMetadata>;
166
167    /// Loads snapshot data as a stream of chunks
168    async fn load_snapshot_data(
169        &self,
170        metadata: SnapshotMetadata,
171    ) -> Result<BoxStream<'static, Result<SnapshotChunk>>>;
172
173    /// Loads a specific chunk of snapshot data by sequence number
174    #[allow(unused)]
175    async fn load_snapshot_chunk(
176        &self,
177        metadata: &SnapshotMetadata,
178        seq: u32,
179    ) -> Result<SnapshotChunk>;
180
181    fn pending_range(&self) -> Option<std::ops::RangeInclusive<u64>>;
182}