rocketmq_controller/raft/
node.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use raft::prelude::*;
22use raft::storage::MemStorage as RaftMemStorage;
23use raft::StateRole;
24use tokio::sync::RwLock;
25use tracing::debug;
26use tracing::info;
27
28use crate::config::ControllerConfig;
29use crate::error::ControllerError;
30use crate::error::Result;
31
32/// Type alias for proposal map to reduce complexity
33type ProposalMap = HashMap<Vec<u8>, tokio::sync::oneshot::Sender<Result<Vec<u8>>>>;
34
35/// Raft node wrapper
36pub struct RaftNode {
37    /// Node ID
38    id: u64,
39
40    /// Raft raw node
41    raw_node: Arc<RwLock<RawNode<RaftMemStorage>>>,
42
43    /// Pending proposals
44    proposals: Arc<RwLock<ProposalMap>>,
45}
46
47impl RaftNode {
48    /// Create a new Raft node
49    pub async fn new(id: u64, config: Arc<ControllerConfig>) -> Result<Self> {
50        // Create Raft configuration
51        let raft_config = Config {
52            id,
53            election_tick: (config.election_timeout_ms / 100) as usize,
54            heartbeat_tick: (config.heartbeat_interval_ms / 100) as usize,
55            max_size_per_msg: 1024 * 1024,
56            max_inflight_msgs: 256,
57            ..Default::default()
58        };
59        raft_config
60            .validate()
61            .map_err(|e| ControllerError::ConfigError(format!("Invalid Raft config: {:?}", e)))?;
62
63        // Create storage
64        let storage = RaftMemStorage::new();
65
66        // Initialize peers
67        let peers: Vec<u64> = config.raft_peers.iter().map(|p| p.id).collect();
68        if !peers.is_empty() {
69            let mut snapshot = Snapshot::default();
70            snapshot.mut_metadata().index = 0;
71            snapshot.mut_metadata().term = 0;
72            snapshot
73                .mut_metadata()
74                .mut_conf_state()
75                .voters
76                .clone_from(&peers);
77
78            storage
79                .wl()
80                .apply_snapshot(snapshot)
81                .map_err(|e| ControllerError::Raft(format!("Failed to apply snapshot: {:?}", e)))?;
82        }
83
84        // Create raw node
85        let raw_node = RawNode::new(&raft_config, storage, &slog_global::get_global())
86            .map_err(|e| ControllerError::Raft(format!("Failed to create RawNode: {:?}", e)))?;
87
88        info!("Created Raft node {} with peers: {:?}", id, peers);
89
90        Ok(Self {
91            id,
92            raw_node: Arc::new(RwLock::new(raw_node)),
93            proposals: Arc::new(RwLock::new(HashMap::new())),
94        })
95    }
96
97    /// Propose a new entry
98    pub async fn propose(&self, data: Vec<u8>) -> Result<Vec<u8>> {
99        let mut raw_node = self.raw_node.write().await;
100
101        // Check if we are the leader
102        if raw_node.raft.state != StateRole::Leader {
103            let leader = raw_node.raft.leader_id;
104            return Err(ControllerError::NotLeader {
105                leader_id: if leader == 0 { None } else { Some(leader) },
106            });
107        }
108
109        // Propose the entry
110        raw_node
111            .propose(vec![], data.clone())
112            .map_err(|e| ControllerError::Raft(format!("Failed to propose: {:?}", e)))?;
113
114        // For now, return immediately
115        // In a real implementation, we would wait for the entry to be committed
116        Ok(data)
117    }
118
119    /// Query current state (read-only)
120    pub async fn query(&self, _data: Vec<u8>) -> Result<Vec<u8>> {
121        let raw_node = self.raw_node.read().await;
122
123        // Check if we are the leader
124        if raw_node.raft.state != StateRole::Leader {
125            let leader = raw_node.raft.leader_id;
126            return Err(ControllerError::NotLeader {
127                leader_id: if leader == 0 { None } else { Some(leader) },
128            });
129        }
130
131        // For now, return empty response
132        // In a real implementation, we would query the state machine
133        Ok(vec![])
134    }
135
136    /// Step the Raft state machine with a message
137    pub async fn step(&self, message: Message) -> Result<()> {
138        let mut raw_node = self.raw_node.write().await;
139        raw_node
140            .step(message)
141            .map_err(|e| ControllerError::Raft(format!("Failed to step: {:?}", e)))?;
142        Ok(())
143    }
144
145    /// Tick the Raft state machine
146    pub async fn tick(&self) -> Result<()> {
147        let mut raw_node = self.raw_node.write().await;
148        raw_node.tick();
149
150        // Process ready
151        if raw_node.has_ready() {
152            let mut ready = raw_node.ready();
153
154            // Handle messages
155            if !ready.messages().is_empty() {
156                // In a real implementation, send these messages to peers
157                debug!("Need to send {} messages to peers", ready.messages().len());
158            }
159
160            // Handle committed entries
161            for entry in ready.take_committed_entries() {
162                if entry.data.is_empty() {
163                    // Empty entry, from leadership transfer
164                    continue;
165                }
166                // Apply to state machine
167                debug!("Applying entry: {:?}", entry);
168            }
169
170            // Advance the Raft
171            let light_rd = raw_node.advance(ready);
172            if let Some(commit) = light_rd.commit_index() {
173                raw_node.mut_store().wl().commit_to(commit).ok();
174            }
175            raw_node.advance_apply();
176        }
177
178        Ok(())
179    }
180
181    /// Check if this node is the leader
182    pub async fn is_leader(&self) -> bool {
183        let raw_node = self.raw_node.read().await;
184        raw_node.raft.state == StateRole::Leader
185    }
186
187    /// Get the current leader ID
188    pub async fn get_leader(&self) -> Option<u64> {
189        let raw_node = self.raw_node.read().await;
190        let leader = raw_node.raft.leader_id;
191        if leader == 0 {
192            None
193        } else {
194            Some(leader)
195        }
196    }
197}
198
199// Helper for slog logger
200mod slog_global {
201    use slog::o;
202    use slog::Discard;
203    use slog::Logger;
204
205    pub fn get_global() -> Logger {
206        Logger::root(Discard, o!())
207    }
208}