d_engine_core/
event.rs

1use std::path::PathBuf;
2
3use d_engine_proto::client::ClientReadRequest;
4use d_engine_proto::client::ClientResponse;
5use d_engine_proto::client::ClientWriteRequest;
6use d_engine_proto::common::LogId;
7use d_engine_proto::server::cluster::ClusterConfChangeRequest;
8use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
9use d_engine_proto::server::cluster::ClusterMembership;
10use d_engine_proto::server::cluster::JoinRequest;
11use d_engine_proto::server::cluster::JoinResponse;
12use d_engine_proto::server::cluster::LeaderDiscoveryRequest;
13use d_engine_proto::server::cluster::LeaderDiscoveryResponse;
14use d_engine_proto::server::cluster::MetadataRequest;
15use d_engine_proto::server::election::VoteRequest;
16use d_engine_proto::server::election::VoteResponse;
17use d_engine_proto::server::replication::AppendEntriesRequest;
18use d_engine_proto::server::replication::AppendEntriesResponse;
19use d_engine_proto::server::storage::PurgeLogRequest;
20use d_engine_proto::server::storage::PurgeLogResponse;
21use d_engine_proto::server::storage::SnapshotAck;
22use d_engine_proto::server::storage::SnapshotChunk;
23use d_engine_proto::server::storage::SnapshotMetadata;
24use d_engine_proto::server::storage::SnapshotResponse;
25use tonic::Status;
26
27use crate::MaybeCloneOneshotSender;
28use crate::Result;
29use crate::StreamResponseSender;
30
31#[derive(Debug, Clone)]
32pub struct NewCommitData {
33    pub new_commit_index: u64,
34    pub role: i32,
35    pub current_term: u64,
36}
37
38#[derive(Debug)]
39#[allow(dead_code)]
40pub enum RoleEvent {
41    BecomeFollower(Option<u32>), // BecomeFollower(Option<leader_id>)
42    BecomeCandidate,
43    BecomeLeader,
44    BecomeLearner,
45
46    NotifyNewCommitIndex(NewCommitData),
47
48    /// Notify when follower/learner confirms leader via committed vote
49    /// Triggered when committed vote changes from false to true
50    /// No state transition - pure notification for watch channel
51    LeaderDiscovered(u32, u64), // (leader_id, term)
52
53    ReprocessEvent(Box<RaftEvent>), //Replay the raft event when step down as another role
54}
55
56#[derive(Debug)]
57pub enum RaftEvent {
58    ReceiveVoteRequest(
59        VoteRequest,
60        MaybeCloneOneshotSender<std::result::Result<VoteResponse, Status>>,
61    ),
62
63    ClusterConf(
64        MetadataRequest,
65        MaybeCloneOneshotSender<std::result::Result<ClusterMembership, Status>>,
66    ),
67
68    ClusterConfUpdate(
69        ClusterConfChangeRequest,
70        MaybeCloneOneshotSender<std::result::Result<ClusterConfUpdateResponse, Status>>,
71    ),
72
73    AppendEntries(
74        AppendEntriesRequest,
75        MaybeCloneOneshotSender<std::result::Result<AppendEntriesResponse, Status>>,
76    ),
77
78    ClientPropose(
79        ClientWriteRequest,
80        MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
81    ),
82
83    ClientReadRequest(
84        ClientReadRequest,
85        MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
86    ),
87
88    // Response snapshot stream from Leader
89    InstallSnapshotChunk(
90        Box<tonic::Streaming<SnapshotChunk>>,
91        MaybeCloneOneshotSender<std::result::Result<SnapshotResponse, Status>>,
92    ),
93
94    // Request snapshot stream from Leader
95    StreamSnapshot(Box<tonic::Streaming<SnapshotAck>>, StreamResponseSender),
96
97    RaftLogCleanUp(
98        PurgeLogRequest,
99        MaybeCloneOneshotSender<std::result::Result<PurgeLogResponse, Status>>,
100    ),
101
102    JoinCluster(
103        JoinRequest,
104        MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
105    ),
106
107    DiscoverLeader(
108        LeaderDiscoveryRequest,
109        MaybeCloneOneshotSender<std::result::Result<LeaderDiscoveryResponse, Status>>,
110    ),
111
112    // Leader connect with peers and push snapshot stream
113    #[allow(unused)]
114    TriggerSnapshotPush {
115        peer_id: u32,
116    },
117
118    CreateSnapshotEvent,
119
120    LogPurgeCompleted(LogId),
121
122    SnapshotCreated(Result<(SnapshotMetadata, PathBuf)>),
123
124    // Lightweight promotion trigger
125    PromoteReadyLearners,
126
127    /// Node removed itself from cluster membership
128    /// Leader must step down immediately after self-removal per Raft protocol
129    StepDownSelfRemoved,
130
131    /// Membership change has been applied to state
132    /// Leader should refresh cluster metadata cache
133    MembershipApplied,
134}
135
136#[cfg(any(test, feature = "test-utils"))]
137#[cfg_attr(any(test, feature = "test-utils"), derive(Debug, Clone))]
138#[allow(unused)]
139pub enum TestEvent {
140    ReceiveVoteRequest(VoteRequest),
141
142    ClusterConf(MetadataRequest),
143
144    ClusterConfUpdate(ClusterConfChangeRequest),
145
146    AppendEntries(AppendEntriesRequest),
147
148    ClientPropose(ClientWriteRequest),
149
150    ClientReadRequest(ClientReadRequest),
151
152    InstallSnapshotChunk,
153
154    StreamSnapshot,
155
156    RaftLogCleanUp(PurgeLogRequest),
157
158    JoinCluster(JoinRequest),
159
160    DiscoverLeader(LeaderDiscoveryRequest),
161
162    TriggerSnapshotPush { peer_id: u32 },
163
164    // None RPC event
165    CreateSnapshotEvent,
166
167    SnapshotCreated,
168
169    LogPurgeCompleted(LogId),
170
171    PromoteReadyLearners,
172}
173
174#[cfg(any(test, feature = "test-utils"))]
175pub fn raft_event_to_test_event(event: &RaftEvent) -> TestEvent {
176    match event {
177        RaftEvent::ReceiveVoteRequest(req, _) => TestEvent::ReceiveVoteRequest(*req),
178        RaftEvent::ClusterConf(req, _) => TestEvent::ClusterConf(*req),
179        RaftEvent::ClusterConfUpdate(req, _) => TestEvent::ClusterConfUpdate(req.clone()),
180        RaftEvent::AppendEntries(req, _) => TestEvent::AppendEntries(req.clone()),
181        RaftEvent::ClientPropose(req, _) => TestEvent::ClientPropose(req.clone()),
182        RaftEvent::ClientReadRequest(req, _) => TestEvent::ClientReadRequest(req.clone()),
183        RaftEvent::InstallSnapshotChunk(_, _) => TestEvent::InstallSnapshotChunk,
184        RaftEvent::StreamSnapshot(_, _) => TestEvent::StreamSnapshot,
185        RaftEvent::RaftLogCleanUp(req, _) => TestEvent::RaftLogCleanUp(req.clone()),
186        RaftEvent::JoinCluster(req, _) => TestEvent::JoinCluster(req.clone()),
187        RaftEvent::DiscoverLeader(req, _) => TestEvent::DiscoverLeader(req.clone()),
188        RaftEvent::CreateSnapshotEvent => TestEvent::CreateSnapshotEvent,
189        RaftEvent::SnapshotCreated(_result) => TestEvent::SnapshotCreated,
190        RaftEvent::LogPurgeCompleted(id) => TestEvent::LogPurgeCompleted(*id),
191        RaftEvent::TriggerSnapshotPush { peer_id } => {
192            TestEvent::TriggerSnapshotPush { peer_id: *peer_id }
193        }
194        RaftEvent::PromoteReadyLearners => TestEvent::PromoteReadyLearners,
195        RaftEvent::StepDownSelfRemoved => {
196            // StepDownSelfRemoved is handled at Raft level, not converted to TestEvent
197            // This is a control flow event, not a user-facing event
198            TestEvent::CreateSnapshotEvent // Placeholder - this event won't be emitted to tests
199        }
200        RaftEvent::MembershipApplied => {
201            // MembershipApplied is internal event for cache refresh
202            TestEvent::CreateSnapshotEvent // Placeholder
203        }
204    }
205}