d_engine_core/
event.rs

1use std::path::PathBuf;
2
3use d_engine_proto::client::ClientReadRequest;
4use d_engine_proto::client::ClientResponse;
5use d_engine_proto::client::ClientWriteRequest;
6use d_engine_proto::common::LogId;
7use d_engine_proto::server::cluster::ClusterConfChangeRequest;
8use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
9use d_engine_proto::server::cluster::ClusterMembership;
10use d_engine_proto::server::cluster::JoinRequest;
11use d_engine_proto::server::cluster::JoinResponse;
12use d_engine_proto::server::cluster::LeaderDiscoveryRequest;
13use d_engine_proto::server::cluster::LeaderDiscoveryResponse;
14use d_engine_proto::server::cluster::MetadataRequest;
15use d_engine_proto::server::election::VoteRequest;
16use d_engine_proto::server::election::VoteResponse;
17use d_engine_proto::server::replication::AppendEntriesRequest;
18use d_engine_proto::server::replication::AppendEntriesResponse;
19use d_engine_proto::server::storage::PurgeLogRequest;
20use d_engine_proto::server::storage::PurgeLogResponse;
21use d_engine_proto::server::storage::SnapshotAck;
22use d_engine_proto::server::storage::SnapshotChunk;
23use d_engine_proto::server::storage::SnapshotMetadata;
24use d_engine_proto::server::storage::SnapshotResponse;
25use tonic::Status;
26
27use crate::MaybeCloneOneshotSender;
28use crate::Result;
29use crate::StreamResponseSender;
30
31#[derive(Debug, Clone)]
32pub struct NewCommitData {
33    pub new_commit_index: u64,
34    pub role: i32,
35    pub current_term: u64,
36}
37
38#[derive(Debug)]
39#[allow(dead_code)]
40pub enum RoleEvent {
41    BecomeFollower(Option<u32>), // BecomeFollower(Option<leader_id>)
42    BecomeCandidate,
43    BecomeLeader,
44    BecomeLearner,
45
46    NotifyNewCommitIndex(NewCommitData),
47
48    /// Notify when follower/learner confirms leader via committed vote
49    /// Triggered when committed vote changes from false to true
50    /// No state transition - pure notification for watch channel
51    LeaderDiscovered(u32, u64), // (leader_id, term)
52
53    ReprocessEvent(Box<RaftEvent>), //Replay the raft event when step down as another role
54}
55
56#[derive(Debug)]
57pub enum RaftEvent {
58    ReceiveVoteRequest(
59        VoteRequest,
60        MaybeCloneOneshotSender<std::result::Result<VoteResponse, Status>>,
61    ),
62
63    ClusterConf(
64        MetadataRequest,
65        MaybeCloneOneshotSender<std::result::Result<ClusterMembership, Status>>,
66    ),
67
68    ClusterConfUpdate(
69        ClusterConfChangeRequest,
70        MaybeCloneOneshotSender<std::result::Result<ClusterConfUpdateResponse, Status>>,
71    ),
72
73    AppendEntries(
74        AppendEntriesRequest,
75        MaybeCloneOneshotSender<std::result::Result<AppendEntriesResponse, Status>>,
76    ),
77
78    ClientPropose(
79        ClientWriteRequest,
80        MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
81    ),
82
83    ClientReadRequest(
84        ClientReadRequest,
85        MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
86    ),
87
88    // Response snapshot stream from Leader
89    InstallSnapshotChunk(
90        Box<tonic::Streaming<SnapshotChunk>>,
91        MaybeCloneOneshotSender<std::result::Result<SnapshotResponse, Status>>,
92    ),
93
94    // Request snapshot stream from Leader
95    StreamSnapshot(Box<tonic::Streaming<SnapshotAck>>, StreamResponseSender),
96
97    RaftLogCleanUp(
98        PurgeLogRequest,
99        MaybeCloneOneshotSender<std::result::Result<PurgeLogResponse, Status>>,
100    ),
101
102    JoinCluster(
103        JoinRequest,
104        MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
105    ),
106
107    DiscoverLeader(
108        LeaderDiscoveryRequest,
109        MaybeCloneOneshotSender<std::result::Result<LeaderDiscoveryResponse, Status>>,
110    ),
111
112    // Leader connect with peers and push snapshot stream
113    #[allow(unused)]
114    TriggerSnapshotPush {
115        peer_id: u32,
116    },
117
118    CreateSnapshotEvent,
119
120    LogPurgeCompleted(LogId),
121
122    SnapshotCreated(Result<(SnapshotMetadata, PathBuf)>),
123
124    // Lightweight promotion trigger
125    PromoteReadyLearners,
126
127    /// Node removed itself from cluster membership
128    /// Leader must step down immediately after self-removal per Raft protocol
129    StepDownSelfRemoved,
130
131    /// Membership change has been applied to state
132    /// Leader should refresh cluster metadata cache
133    MembershipApplied,
134
135    /// Signal to flush pending read requests
136    /// Sent by timeout task when read buffer reaches time threshold
137    FlushReadBuffer,
138}
139
140#[cfg(test)]
141#[cfg_attr(test, derive(Debug, Clone))]
142#[allow(unused)]
143pub enum TestEvent {
144    ReceiveVoteRequest(VoteRequest),
145
146    ClusterConf(MetadataRequest),
147
148    ClusterConfUpdate(ClusterConfChangeRequest),
149
150    AppendEntries(AppendEntriesRequest),
151
152    ClientPropose(ClientWriteRequest),
153
154    ClientReadRequest(ClientReadRequest),
155
156    InstallSnapshotChunk,
157
158    StreamSnapshot,
159
160    RaftLogCleanUp(PurgeLogRequest),
161
162    JoinCluster(JoinRequest),
163
164    DiscoverLeader(LeaderDiscoveryRequest),
165
166    TriggerSnapshotPush { peer_id: u32 },
167
168    // None RPC event
169    CreateSnapshotEvent,
170
171    SnapshotCreated,
172
173    LogPurgeCompleted(LogId),
174
175    PromoteReadyLearners,
176
177    FlushReadBuffer,
178}
179
180#[cfg(test)]
181pub(crate) fn raft_event_to_test_event(event: &RaftEvent) -> TestEvent {
182    match event {
183        RaftEvent::ReceiveVoteRequest(req, _) => TestEvent::ReceiveVoteRequest(*req),
184        RaftEvent::ClusterConf(req, _) => TestEvent::ClusterConf(*req),
185        RaftEvent::ClusterConfUpdate(req, _) => TestEvent::ClusterConfUpdate(req.clone()),
186        RaftEvent::AppendEntries(req, _) => TestEvent::AppendEntries(req.clone()),
187        RaftEvent::ClientPropose(req, _) => TestEvent::ClientPropose(req.clone()),
188        RaftEvent::ClientReadRequest(req, _) => TestEvent::ClientReadRequest(req.clone()),
189        RaftEvent::InstallSnapshotChunk(_, _) => TestEvent::InstallSnapshotChunk,
190        RaftEvent::StreamSnapshot(_, _) => TestEvent::StreamSnapshot,
191        RaftEvent::RaftLogCleanUp(req, _) => TestEvent::RaftLogCleanUp(req.clone()),
192        RaftEvent::JoinCluster(req, _) => TestEvent::JoinCluster(req.clone()),
193        RaftEvent::DiscoverLeader(req, _) => TestEvent::DiscoverLeader(req.clone()),
194        RaftEvent::CreateSnapshotEvent => TestEvent::CreateSnapshotEvent,
195        RaftEvent::SnapshotCreated(_result) => TestEvent::SnapshotCreated,
196        RaftEvent::LogPurgeCompleted(id) => TestEvent::LogPurgeCompleted(*id),
197        RaftEvent::TriggerSnapshotPush { peer_id } => {
198            TestEvent::TriggerSnapshotPush { peer_id: *peer_id }
199        }
200        RaftEvent::PromoteReadyLearners => TestEvent::PromoteReadyLearners,
201        RaftEvent::StepDownSelfRemoved => {
202            // StepDownSelfRemoved is handled at Raft level, not converted to TestEvent
203            // This is a control flow event, not a user-facing event
204            TestEvent::CreateSnapshotEvent // Placeholder - this event won't be emitted to tests
205        }
206        RaftEvent::MembershipApplied => {
207            // MembershipApplied is internal event for cache refresh
208            TestEvent::CreateSnapshotEvent // Placeholder
209        }
210        RaftEvent::FlushReadBuffer => TestEvent::FlushReadBuffer,
211    }
212}