Skip to main content

d_engine_core/
event.rs

1use std::path::PathBuf;
2
3use d_engine_proto::client::ClientReadRequest;
4use d_engine_proto::client::ClientResponse;
5use d_engine_proto::client::ClientWriteRequest;
6use d_engine_proto::common::LogId;
7use d_engine_proto::server::cluster::ClusterConfChangeRequest;
8use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
9use d_engine_proto::server::cluster::ClusterMembership;
10use d_engine_proto::server::cluster::JoinRequest;
11use d_engine_proto::server::cluster::JoinResponse;
12use d_engine_proto::server::cluster::LeaderDiscoveryRequest;
13use d_engine_proto::server::cluster::LeaderDiscoveryResponse;
14use d_engine_proto::server::cluster::MetadataRequest;
15use d_engine_proto::server::election::VoteRequest;
16use d_engine_proto::server::election::VoteResponse;
17use d_engine_proto::server::replication::AppendEntriesRequest;
18use d_engine_proto::server::replication::AppendEntriesResponse;
19use d_engine_proto::server::storage::SnapshotAck;
20use d_engine_proto::server::storage::SnapshotChunk;
21use d_engine_proto::server::storage::SnapshotMetadata;
22use d_engine_proto::server::storage::SnapshotResponse;
23use tonic::Status;
24
25use crate::ApplyResult;
26use crate::MaybeCloneOneshotSender;
27use crate::Result;
28use crate::StreamResponseSender;
29
30#[derive(Debug, Clone, PartialEq)]
31pub struct NewCommitData {
32    pub new_commit_index: u64,
33    pub role: i32,
34    pub current_term: u64,
35}
36
37/// Client commands that require batching for performance
38/// Separated from internal RaftEvent for drain-driven processing
39#[derive(Debug)]
40pub enum ClientCmd {
41    Propose(
42        ClientWriteRequest,
43        MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
44    ),
45    Read(
46        ClientReadRequest,
47        MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
48    ),
49}
50
51#[derive(Debug)]
52#[allow(dead_code)]
53pub enum RoleEvent {
54    BecomeFollower(Option<u32>), // BecomeFollower(Option<leader_id>)
55    BecomeCandidate,
56    BecomeLeader,
57    BecomeLearner,
58
59    NotifyNewCommitIndex(NewCommitData),
60
61    /// Notify when follower/learner confirms leader via committed vote
62    /// Triggered when committed vote changes from false to true
63    /// No state transition - pure notification for watch channel
64    LeaderDiscovered(u32, u64), // (leader_id, term)
65
66    ReprocessEvent(Box<RaftEvent>), //Replay the raft event when step down as another role
67}
68
69#[derive(Debug)]
70pub enum RaftEvent {
71    ReceiveVoteRequest(
72        VoteRequest,
73        MaybeCloneOneshotSender<std::result::Result<VoteResponse, Status>>,
74    ),
75
76    ClusterConf(
77        MetadataRequest,
78        MaybeCloneOneshotSender<std::result::Result<ClusterMembership, Status>>,
79    ),
80
81    ClusterConfUpdate(
82        ClusterConfChangeRequest,
83        MaybeCloneOneshotSender<std::result::Result<ClusterConfUpdateResponse, Status>>,
84    ),
85
86    AppendEntries(
87        AppendEntriesRequest,
88        MaybeCloneOneshotSender<std::result::Result<AppendEntriesResponse, Status>>,
89    ),
90
91    // Response snapshot stream from Leader
92    InstallSnapshotChunk(
93        Box<tonic::Streaming<SnapshotChunk>>,
94        MaybeCloneOneshotSender<std::result::Result<SnapshotResponse, Status>>,
95    ),
96
97    // Request snapshot stream from Leader
98    StreamSnapshot(Box<tonic::Streaming<SnapshotAck>>, StreamResponseSender),
99
100    JoinCluster(
101        JoinRequest,
102        MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
103    ),
104
105    DiscoverLeader(
106        LeaderDiscoveryRequest,
107        MaybeCloneOneshotSender<std::result::Result<LeaderDiscoveryResponse, Status>>,
108    ),
109
110    // Leader connect with peers and push snapshot stream
111    #[allow(unused)]
112    TriggerSnapshotPush {
113        peer_id: u32,
114    },
115
116    CreateSnapshotEvent,
117
118    LogPurgeCompleted(LogId),
119
120    SnapshotCreated(Result<(SnapshotMetadata, PathBuf)>),
121
122    // Lightweight promotion trigger
123    PromoteReadyLearners,
124
125    /// Node removed itself from cluster membership
126    /// Leader must step down immediately after self-removal per Raft protocol
127    StepDownSelfRemoved,
128
129    /// Membership change has been applied to state
130    /// Leader should refresh cluster metadata cache
131    MembershipApplied,
132
133    /// State machine apply failed - node must shutdown.
134    /// Conservative: treat all SM errors as fatal (future: distinguish fatal vs application errors).
135    FatalError {
136        source: String, // Error source
137        error: String,  // Error message
138    },
139
140    /// State machine apply completed
141    /// Sent by CommitHandler after applying entries to state machine
142    /// Contains results for each applied entry (e.g., CAS success/failure)
143    ApplyCompleted {
144        last_index: u64,
145        results: Vec<ApplyResult>,
146    },
147}
148
149#[cfg(test)]
150#[cfg_attr(test, derive(Debug, Clone))]
151#[allow(unused)]
152pub enum TestEvent {
153    ReceiveVoteRequest(VoteRequest),
154
155    ClusterConf(MetadataRequest),
156
157    ClusterConfUpdate(ClusterConfChangeRequest),
158
159    AppendEntries(AppendEntriesRequest),
160
161    ClientPropose(ClientWriteRequest),
162
163    ClientReadRequest(ClientReadRequest),
164
165    InstallSnapshotChunk,
166
167    StreamSnapshot,
168
169    JoinCluster(JoinRequest),
170
171    DiscoverLeader(LeaderDiscoveryRequest),
172
173    TriggerSnapshotPush {
174        peer_id: u32,
175    },
176
177    // None RPC event
178    CreateSnapshotEvent,
179
180    SnapshotCreated,
181
182    LogPurgeCompleted(LogId),
183
184    PromoteReadyLearners,
185
186    FatalError {
187        source: String,
188        error: String,
189    },
190
191    ApplyCompleted {
192        last_index: u64,
193        results: Vec<ApplyResult>,
194    },
195}
196
197#[cfg(test)]
198pub(crate) fn raft_event_to_test_event(event: &RaftEvent) -> TestEvent {
199    match event {
200        RaftEvent::ReceiveVoteRequest(req, _) => TestEvent::ReceiveVoteRequest(*req),
201        RaftEvent::ClusterConf(req, _) => TestEvent::ClusterConf(*req),
202        RaftEvent::ClusterConfUpdate(req, _) => TestEvent::ClusterConfUpdate(req.clone()),
203        RaftEvent::AppendEntries(req, _) => TestEvent::AppendEntries(req.clone()),
204        RaftEvent::InstallSnapshotChunk(_, _) => TestEvent::InstallSnapshotChunk,
205        RaftEvent::StreamSnapshot(_, _) => TestEvent::StreamSnapshot,
206        RaftEvent::JoinCluster(req, _) => TestEvent::JoinCluster(req.clone()),
207        RaftEvent::DiscoverLeader(req, _) => TestEvent::DiscoverLeader(req.clone()),
208        RaftEvent::CreateSnapshotEvent => TestEvent::CreateSnapshotEvent,
209        RaftEvent::SnapshotCreated(_result) => TestEvent::SnapshotCreated,
210        RaftEvent::LogPurgeCompleted(id) => TestEvent::LogPurgeCompleted(*id),
211        RaftEvent::TriggerSnapshotPush { peer_id } => {
212            TestEvent::TriggerSnapshotPush { peer_id: *peer_id }
213        }
214        RaftEvent::PromoteReadyLearners => TestEvent::PromoteReadyLearners,
215        RaftEvent::StepDownSelfRemoved => {
216            // StepDownSelfRemoved is handled at Raft level, not converted to TestEvent
217            // This is a control flow event, not a user-facing event
218            TestEvent::CreateSnapshotEvent // Placeholder - this event won't be emitted to tests
219        }
220        RaftEvent::MembershipApplied => {
221            // MembershipApplied is internal event for cache refresh
222            TestEvent::CreateSnapshotEvent // Placeholder
223        }
224        RaftEvent::FatalError { source, error } => TestEvent::FatalError {
225            source: source.clone(),
226            error: error.clone(),
227        },
228        RaftEvent::ApplyCompleted {
229            last_index,
230            results,
231        } => TestEvent::ApplyCompleted {
232            last_index: *last_index,
233            results: results.clone(),
234        },
235    }
236}