d_engine_core/state_machine_handler/mod.rs
1//! The `StateMachineHandler` module provides a core component for managing both
2//! write operations and read requests against the `StateMachine`.
3//!
4//! Snapshot related responbilities:
5//! - Creating/Deleting temporary snapshot files
6//! - Finalizing snapshot file naming and organization
7//! - Version control of snapshots
8//! - File system I/O operations for snapshots
9//! - Handling file locks and concurrency control
10//!
11//! ## Relationship Between `StateMachineHandler` and `StateMachine`
12//! The `StateMachineHandler` serves as the primary interface for interacting
13//! with the `StateMachine`. Its dual responsibilities are:
14//! 1. Applying committed log entries to the `StateMachine` to maintain state consistency
15//! 2. Directly servicing client read requests through state machine queries
16//!
17//! While maintaining separation from the `StateMachine` itself, the handler
18//! leverages the `StateMachine` trait for both state updates and read
19//! operations. This design centralizes all state access points while preserving
20//! separation of concerns.
21//!
22//! ## Design Recommendations
23//! - **Customization Focus**: Developers should prioritize extending the `StateMachine`
24//! implementation rather than modifying the `StateMachineHandler`. The handler is intentionally
25//! generic and battle-tested, serving as:
26//! - Write coordinator for log application
27//! - Read router for direct state queries
28//! - **State Access Unification**: All state access (both write and read) should flow through the
29//! handler to leverage:
30//! - Consistent concurrency control
31//! - Atomic visibility guarantees
32//! - Linearizable read optimizations
33
34mod default_state_machine_handler;
35mod snapshot_assembler;
36mod snapshot_guard;
37mod snapshot_policy;
38mod worker;
39
40pub use default_state_machine_handler::*;
41pub(crate) use snapshot_assembler::*;
42pub(crate) use snapshot_guard::*;
43pub use snapshot_policy::*;
44pub use worker::*;
45
46#[cfg(test)]
47mod default_state_machine_handler_test;
48#[cfg(test)]
49mod snapshot_assembler_test;
50#[cfg(test)]
51mod wait_applied_test;
52#[cfg(test)]
53mod worker_test;
54
55use d_engine_proto::client::ClientResult;
56use d_engine_proto::server::storage::SnapshotChunk;
57use d_engine_proto::server::storage::SnapshotMetadata;
58use futures::stream::BoxStream;
59#[cfg(any(test, feature = "__test_support"))]
60use mockall::automock;
61use tonic::async_trait;
62
63use super::NewCommitData;
64use crate::ApplyResult;
65use crate::Result;
66use crate::TypeConfig;
67
68#[cfg_attr(any(test, feature = "__test_support"), automock)]
69#[async_trait]
70pub trait StateMachineHandler<T>: Send + Sync + 'static
71where
72 T: TypeConfig,
73{
74 fn last_applied(&self) -> u64;
75
76 /// Updates the highest known committed log index that hasn't been applied yet
77 fn update_pending(
78 &self,
79 new_commit: u64,
80 );
81
82 /// Waits until the state machine has applied entries up to the target index.
83 /// Returns error if timeout is reached before the target is applied.
84 ///
85 /// This is used to ensure linearizable reads: after leader confirms a log entry
86 /// is committed (via quorum), we must wait for the state machine to apply it
87 /// before reading to guarantee the read reflects all committed writes.
88 async fn wait_applied(
89 &self,
90 target_index: u64,
91 timeout: std::time::Duration,
92 ) -> Result<()>;
93
94 /// Applies a batch of committed log entries to the state machine
95 ///
96 /// Returns execution results for each entry in the same order as input.
97 /// The returned vector length MUST equal the input chunk length.
98 async fn apply_chunk(
99 &self,
100 chunk: Vec<d_engine_proto::common::Entry>,
101 ) -> Result<Vec<ApplyResult>>;
102
103 /// Reads values from the state machine for given keys
104 /// Returns None if any key doesn't exist
105 fn read_from_state_machine(
106 &self,
107 keys: Vec<bytes::Bytes>,
108 ) -> Option<Vec<ClientResult>>;
109
110 /// Receives and installs a snapshot stream pushed by the leader
111 /// Used when leader proactively sends snapshot updates to followers
112 async fn apply_snapshot_stream_from_leader(
113 &self,
114 current_term: u64,
115 stream: Box<tonic::Streaming<SnapshotChunk>>,
116 ack_tx: tokio::sync::mpsc::Sender<d_engine_proto::server::storage::SnapshotAck>,
117 config: &crate::SnapshotConfig,
118 ) -> Result<()>;
119
120 /// Determines if a snapshot should be created based on new commit data
121 fn should_snapshot(
122 &self,
123 new_commit_data: NewCommitData,
124 ) -> bool;
125
126 /// Asynchronously creates a state machine snapshot with the following steps:
127 /// 1. Acquires a write lock to ensure exclusive access during snapshot creation
128 /// 2. Prepares temporary and final snapshot file paths using:
129 /// - Last applied index/term from state machine
130 /// 3. Generates snapshot data to temporary file using state machine implementation
131 /// 4. Atomically renames temporary file to final snapshot file to ensure consistency
132 /// 5. Cleans up old snapshots based on last_included_index, retaining only the latest snapshot
133 /// files as specified by cleanup_retain_count.
134 ///
135 /// Returns new Snapshot metadata and final snapshot path to indicate the new snapshot file has
136 /// been successfully created
137 async fn create_snapshot(&self) -> Result<(SnapshotMetadata, std::path::PathBuf)>;
138
139 /// Cleans up old snapshots before specified version
140 async fn cleanup_snapshot(
141 &self,
142 before_version: u64,
143 snapshot_dir: &std::path::Path,
144 snapshot_dir_prefix: &str,
145 ) -> crate::Result<()>;
146
147 /// Retrieves metadata of the latest valid snapshot
148 fn get_latest_snapshot_metadata(&self) -> Option<SnapshotMetadata>;
149
150 /// Loads snapshot data as a stream of chunks
151 async fn load_snapshot_data(
152 &self,
153 metadata: SnapshotMetadata,
154 ) -> Result<BoxStream<'static, Result<SnapshotChunk>>>;
155
156 /// Loads a specific chunk of snapshot data by sequence number
157 #[allow(unused)]
158 async fn load_snapshot_chunk(
159 &self,
160 metadata: &SnapshotMetadata,
161 seq: u32,
162 ) -> Result<SnapshotChunk>;
163
164 fn pending_range(&self) -> Option<std::ops::RangeInclusive<u64>>;
165}