d_engine_core/state_machine_handler/mod.rs
1//! The `StateMachineHandler` module provides a core component for managing both
2//! write operations and read requests against the `StateMachine`.
3//!
4//! Snapshot related responbilities:
5//! - Creating/Deleting temporary snapshot files
6//! - Finalizing snapshot file naming and organization
7//! - Version control of snapshots
8//! - File system I/O operations for snapshots
9//! - Handling file locks and concurrency control
10//!
11//! ## Relationship Between `StateMachineHandler` and `StateMachine`
12//! The `StateMachineHandler` serves as the primary interface for interacting
13//! with the `StateMachine`. Its dual responsibilities are:
14//! 1. Applying committed log entries to the `StateMachine` to maintain state consistency
15//! 2. Directly servicing client read requests through state machine queries
16//!
17//! While maintaining separation from the `StateMachine` itself, the handler
18//! leverages the `StateMachine` trait for both state updates and read
19//! operations. This design centralizes all state access points while preserving
20//! separation of concerns.
21//!
22//! ## Design Recommendations
23//! - **Customization Focus**: Developers should prioritize extending the `StateMachine`
24//! implementation rather than modifying the `StateMachineHandler`. The handler is intentionally
25//! generic and battle-tested, serving as:
26//! - Write coordinator for log application
27//! - Read router for direct state queries
28//! - **State Access Unification**: All state access (both write and read) should flow through the
29//! handler to leverage:
30//! - Consistent concurrency control
31//! - Atomic visibility guarantees
32//! - Linearizable read optimizations
33
34mod default_state_machine_handler;
35mod snapshot_assembler;
36mod snapshot_guard;
37mod snapshot_policy;
38
39pub use default_state_machine_handler::*;
40pub(crate) use snapshot_assembler::*;
41pub(crate) use snapshot_guard::*;
42pub use snapshot_policy::*;
43
44#[cfg(test)]
45mod default_state_machine_handler_test;
46#[cfg(test)]
47mod snapshot_assembler_test;
48#[cfg(test)]
49mod wait_applied_test;
50
51use std::sync::Arc;
52
53use d_engine_proto::client::ClientResult;
54use d_engine_proto::common::LogId;
55use d_engine_proto::server::storage::PurgeLogRequest;
56use d_engine_proto::server::storage::PurgeLogResponse;
57use d_engine_proto::server::storage::SnapshotChunk;
58use d_engine_proto::server::storage::SnapshotMetadata;
59use futures::stream::BoxStream;
60#[cfg(any(test, feature = "__test_support"))]
61use mockall::automock;
62use tonic::async_trait;
63
64use super::NewCommitData;
65use crate::Result;
66use crate::TypeConfig;
67use crate::alias::ROF;
68
69#[cfg_attr(any(test, feature = "__test_support"), automock)]
70#[async_trait]
71pub trait StateMachineHandler<T>: Send + Sync + 'static
72where
73 T: TypeConfig,
74{
75 fn last_applied(&self) -> u64;
76
77 /// Updates the highest known committed log index that hasn't been applied yet
78 fn update_pending(
79 &self,
80 new_commit: u64,
81 );
82
83 /// Waits until the state machine has applied entries up to the target index.
84 /// Returns error if timeout is reached before the target is applied.
85 ///
86 /// This is used to ensure linearizable reads: after leader confirms a log entry
87 /// is committed (via quorum), we must wait for the state machine to apply it
88 /// before reading to guarantee the read reflects all committed writes.
89 async fn wait_applied(
90 &self,
91 target_index: u64,
92 timeout: std::time::Duration,
93 ) -> Result<()>;
94
95 /// Applies a batch of committed log entries to the state machine
96 async fn apply_chunk(
97 &self,
98 chunk: Vec<d_engine_proto::common::Entry>,
99 ) -> Result<()>;
100
101 /// Reads values from the state machine for given keys
102 /// Returns None if any key doesn't exist
103 fn read_from_state_machine(
104 &self,
105 keys: Vec<bytes::Bytes>,
106 ) -> Option<Vec<ClientResult>>;
107
108 /// Receives and installs a snapshot stream pushed by the leader
109 /// Used when leader proactively sends snapshot updates to followers
110 async fn apply_snapshot_stream_from_leader(
111 &self,
112 current_term: u64,
113 stream: Box<tonic::Streaming<SnapshotChunk>>,
114 ack_tx: tokio::sync::mpsc::Sender<d_engine_proto::server::storage::SnapshotAck>,
115 config: &crate::SnapshotConfig,
116 ) -> Result<()>;
117
118 /// Determines if a snapshot should be created based on new commit data
119 fn should_snapshot(
120 &self,
121 new_commit_data: NewCommitData,
122 ) -> bool;
123
124 /// Asynchronously creates a state machine snapshot with the following steps:
125 /// 1. Acquires a write lock to ensure exclusive access during snapshot creation
126 /// 2. Prepares temporary and final snapshot file paths using:
127 /// - Last applied index/term from state machine
128 /// 3. Generates snapshot data to temporary file using state machine implementation
129 /// 4. Atomically renames temporary file to final snapshot file to ensure consistency
130 /// 5. Cleans up old snapshots based on last_included_index, retaining only the latest snapshot
131 /// files as specified by cleanup_retain_count.
132 ///
133 /// Returns new Snapshot metadata and final snapshot path to indicate the new snapshot file has
134 /// been successfully created
135 async fn create_snapshot(&self) -> Result<(SnapshotMetadata, std::path::PathBuf)>;
136
137 /// Cleans up old snapshots before specified version
138 async fn cleanup_snapshot(
139 &self,
140 before_version: u64,
141 snapshot_dir: &std::path::Path,
142 snapshot_dir_prefix: &str,
143 ) -> crate::Result<()>;
144
145 /// Validates if a log purge request from leader is authorized
146 async fn validate_purge_request(
147 &self,
148 current_term: u64,
149 leader_id: Option<u32>,
150 req: &PurgeLogRequest,
151 ) -> Result<bool>;
152
153 /// Processes log purge requests (for non-leader nodes)
154 #[allow(unused)]
155 async fn handle_purge_request(
156 &self,
157 current_term: u64,
158 leader_id: Option<u32>,
159 last_purged: Option<LogId>,
160 req: &PurgeLogRequest,
161 raft_log: &Arc<ROF<T>>,
162 ) -> Result<PurgeLogResponse>;
163
164 /// Retrieves metadata of the latest valid snapshot
165 fn get_latest_snapshot_metadata(&self) -> Option<SnapshotMetadata>;
166
167 /// Loads snapshot data as a stream of chunks
168 async fn load_snapshot_data(
169 &self,
170 metadata: SnapshotMetadata,
171 ) -> Result<BoxStream<'static, Result<SnapshotChunk>>>;
172
173 /// Loads a specific chunk of snapshot data by sequence number
174 #[allow(unused)]
175 async fn load_snapshot_chunk(
176 &self,
177 metadata: &SnapshotMetadata,
178 seq: u32,
179 ) -> Result<SnapshotChunk>;
180
181 fn pending_range(&self) -> Option<std::ops::RangeInclusive<u64>>;
182}