Skip to main content

d_engine_core/
event.rs

1use std::path::PathBuf;
2
3use crate::client::{ClientReadRequest, ClientResponse, ClientWriteRequest};
4use d_engine_proto::common::LogId;
5use d_engine_proto::server::cluster::ClusterConfChangeRequest;
6use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
7use d_engine_proto::server::cluster::ClusterMembership;
8use d_engine_proto::server::cluster::JoinRequest;
9use d_engine_proto::server::cluster::JoinResponse;
10use d_engine_proto::server::cluster::LeaderDiscoveryRequest;
11use d_engine_proto::server::cluster::LeaderDiscoveryResponse;
12use d_engine_proto::server::cluster::MetadataRequest;
13use d_engine_proto::server::election::VoteRequest;
14use d_engine_proto::server::election::VoteResponse;
15use d_engine_proto::server::replication::AppendEntriesRequest;
16use d_engine_proto::server::replication::AppendEntriesResponse;
17use d_engine_proto::server::storage::SnapshotAck;
18use d_engine_proto::server::storage::SnapshotChunk;
19use d_engine_proto::server::storage::SnapshotMetadata;
20use d_engine_proto::server::storage::SnapshotResponse;
21use tonic::Status;
22
23use crate::ApplyResult;
24use crate::MaybeCloneOneshotSender;
25use crate::Result;
26use crate::ScanResult;
27use crate::StreamResponseSender;
28
29#[derive(Debug, Clone, PartialEq)]
30pub struct NewCommitData {
31    pub new_commit_index: u64,
32    pub role: i32,
33    pub current_term: u64,
34}
35
36/// Client commands that require batching for performance
37/// Separated from internal RaftEvent for drain-driven processing
38#[derive(Debug)]
39pub enum ClientCmd {
40    Propose(
41        ClientWriteRequest,
42        MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
43    ),
44    Read(
45        ClientReadRequest,
46        MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
47    ),
48    Scan(
49        bytes::Bytes,
50        MaybeCloneOneshotSender<std::result::Result<ScanResult, Status>>,
51    ),
52}
53
54#[derive(Debug)]
55#[allow(dead_code)]
56pub enum RoleEvent {
57    BecomeFollower(Option<u32>), // BecomeFollower(Option<leader_id>)
58    BecomeCandidate,
59    BecomeLeader,
60    BecomeLearner,
61
62    NotifyNewCommitIndex(NewCommitData),
63
64    /// Notify when follower/learner confirms leader via committed vote
65    /// Triggered when committed vote changes from false to true
66    /// No state transition - pure notification for watch channel
67    LeaderDiscovered(u32, u64), // (leader_id, term)
68
69    ReprocessEvent(Box<RaftEvent>), //Replay the raft event when step down as another role
70
71    /// Notify Raft loop that log entries up to `durable_index` are crash-safe (fsync complete).
72    /// Sent by batch_processor after flush completes.
73    /// Follower/Learner: triggers pending ACK send. Leader: triggers commit re-calculation.
74    LogFlushed {
75        durable_index: u64,
76    },
77
78    /// AppendEntries result from a per-follower ReplicationWorker back to the Raft loop.
79    /// Leader processes this in handle_append_result: updates match_index, re-calculates commit,
80    /// and drains pending_client_writes when quorum is achieved.
81    AppendResult {
82        follower_id: u32,
83        result: Result<AppendEntriesResponse>,
84    },
85
86    /// Snapshot push result from a per-follower ReplicationWorker back to the Raft loop.
87    /// Emitted after `transport.send_snapshot` completes (success or failure).
88    /// Leader uses this to clear the per-worker `snapshot_in_progress` flag if needed.
89    SnapshotPushCompleted {
90        peer_id: u32,
91        success: bool,
92    },
93
94    /// Noop entry committed — leader has confirmed quorum leadership.
95    /// Sent by LeaderState::drain_commit_actions when the noop log index is committed.
96    /// Raft loop responds by calling on_noop_committed() + notify_leader_change().
97    NoopCommitted {
98        term: u64,
99    },
100
101    /// State machine apply completed — processed at P2 (unbounded role_tx) to avoid
102    /// priority inversion: AppendEntries RPCs at P4 (bounded event_tx) must not starve
103    /// internal commit-driven events.
104    ApplyCompleted {
105        last_index: u64,
106        results: Vec<ApplyResult>,
107    },
108
109    /// Bidi replication stream to a follower broke (network disconnect or error).
110    /// Emitted by the replication worker's recv task when it gets an Err from the stream.
111    /// Raft loop resets `next_index[peer] = match_index[peer] + 1` so that the next
112    /// heartbeat re-sends any unACKed entries.  Worker handles reconnection internally.
113    PeerStreamError {
114        peer_id: u32,
115    },
116
117    /// A peer's connection failure count crossed zombie_threshold.
118    /// Emitted by RaftHealthMonitor (server layer) via an injected `Sender<u32>`.
119    /// Leader responds by proposing a BatchRemove config change for that node.
120    ZombieDetected(u32),
121
122    /// Fatal error from SM worker — node must shutdown.
123    /// Sent via role_tx (P2) so it is not blocked behind external RPCs on event_tx (P4).
124    FatalError {
125        source: String,
126        error: String,
127    },
128}
129
130#[derive(Debug)]
131pub enum RaftEvent {
132    ReceiveVoteRequest(
133        VoteRequest,
134        MaybeCloneOneshotSender<std::result::Result<VoteResponse, Status>>,
135    ),
136
137    ClusterConf(
138        MetadataRequest,
139        MaybeCloneOneshotSender<std::result::Result<ClusterMembership, Status>>,
140    ),
141
142    ClusterConfUpdate(
143        ClusterConfChangeRequest,
144        MaybeCloneOneshotSender<std::result::Result<ClusterConfUpdateResponse, Status>>,
145    ),
146
147    AppendEntries(
148        AppendEntriesRequest,
149        MaybeCloneOneshotSender<std::result::Result<AppendEntriesResponse, Status>>,
150    ),
151
152    // Response snapshot stream from Leader
153    InstallSnapshotChunk(
154        Box<tonic::Streaming<SnapshotChunk>>,
155        MaybeCloneOneshotSender<std::result::Result<SnapshotResponse, Status>>,
156    ),
157
158    // Request snapshot stream from Leader
159    StreamSnapshot(Box<tonic::Streaming<SnapshotAck>>, StreamResponseSender),
160
161    JoinCluster(
162        JoinRequest,
163        MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
164    ),
165
166    DiscoverLeader(
167        LeaderDiscoveryRequest,
168        MaybeCloneOneshotSender<std::result::Result<LeaderDiscoveryResponse, Status>>,
169    ),
170
171    CreateSnapshotEvent,
172
173    LogPurgeCompleted(LogId),
174
175    SnapshotCreated(Result<(SnapshotMetadata, PathBuf)>),
176
177    // Lightweight promotion trigger
178    PromoteReadyLearners,
179
180    /// Node removed itself from cluster membership
181    /// Leader must step down immediately after self-removal per Raft protocol
182    StepDownSelfRemoved,
183
184    /// Membership change has been applied to state
185    /// Leader should refresh cluster metadata cache
186    MembershipApplied,
187
188    /// State machine apply failed - node must shutdown.
189    /// Conservative: treat all SM errors as fatal (future: distinguish fatal vs application errors).
190    FatalError {
191        source: String, // Error source
192        error: String,  // Error message
193    },
194}
195
196#[cfg(test)]
197#[cfg_attr(test, derive(Debug, Clone))]
198#[allow(unused)]
199pub enum TestEvent {
200    ReceiveVoteRequest(VoteRequest),
201
202    ClusterConf(MetadataRequest),
203
204    ClusterConfUpdate(ClusterConfChangeRequest),
205
206    AppendEntries(AppendEntriesRequest),
207
208    ClientPropose(ClientWriteRequest),
209
210    ClientReadRequest(ClientReadRequest),
211
212    InstallSnapshotChunk,
213
214    StreamSnapshot,
215
216    JoinCluster(JoinRequest),
217
218    DiscoverLeader(LeaderDiscoveryRequest),
219
220    // None RPC event
221    CreateSnapshotEvent,
222
223    SnapshotCreated,
224
225    LogPurgeCompleted(LogId),
226
227    PromoteReadyLearners,
228
229    FatalError {
230        source: String,
231        error: String,
232    },
233
234    ApplyCompleted {
235        last_index: u64,
236        results: Vec<ApplyResult>,
237    },
238}
239
240#[cfg(test)]
241pub(crate) fn raft_event_to_test_event(event: &RaftEvent) -> TestEvent {
242    match event {
243        RaftEvent::ReceiveVoteRequest(req, _) => TestEvent::ReceiveVoteRequest(*req),
244        RaftEvent::ClusterConf(req, _) => TestEvent::ClusterConf(*req),
245        RaftEvent::ClusterConfUpdate(req, _) => TestEvent::ClusterConfUpdate(req.clone()),
246        RaftEvent::AppendEntries(req, _) => TestEvent::AppendEntries(req.clone()),
247        RaftEvent::InstallSnapshotChunk(_, _) => TestEvent::InstallSnapshotChunk,
248        RaftEvent::StreamSnapshot(_, _) => TestEvent::StreamSnapshot,
249        RaftEvent::JoinCluster(req, _) => TestEvent::JoinCluster(req.clone()),
250        RaftEvent::DiscoverLeader(req, _) => TestEvent::DiscoverLeader(req.clone()),
251        RaftEvent::CreateSnapshotEvent => TestEvent::CreateSnapshotEvent,
252        RaftEvent::SnapshotCreated(_result) => TestEvent::SnapshotCreated,
253        RaftEvent::LogPurgeCompleted(id) => TestEvent::LogPurgeCompleted(*id),
254        RaftEvent::PromoteReadyLearners => TestEvent::PromoteReadyLearners,
255        RaftEvent::StepDownSelfRemoved => {
256            // StepDownSelfRemoved is handled at Raft level, not converted to TestEvent
257            // This is a control flow event, not a user-facing event
258            TestEvent::CreateSnapshotEvent // Placeholder - this event won't be emitted to tests
259        }
260        RaftEvent::MembershipApplied => {
261            // MembershipApplied is internal event for cache refresh
262            TestEvent::CreateSnapshotEvent // Placeholder
263        }
264        RaftEvent::FatalError { source, error } => TestEvent::FatalError {
265            source: source.clone(),
266            error: error.clone(),
267        },
268    }
269}