rocketmq_controller/raft/
network.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use raft::prelude::Message;
22use tokio::sync::mpsc;
23use tracing::debug;
24use tracing::error;
25use tracing::info;
26
27use crate::config::ControllerConfig;
28use crate::error::Result;
29use crate::raft::RaftTransport;
30
31/// Network manager for Raft communication
32///
33/// This component manages the network layer for Raft messages,
34/// handling both sending and receiving messages between nodes.
35pub struct NetworkManager {
36    /// Configuration
37    config: Arc<ControllerConfig>,
38
39    /// Transport layer
40    transport: Arc<RaftTransport>,
41
42    /// Message receiver for outgoing messages
43    outgoing_rx: Option<mpsc::UnboundedReceiver<Message>>,
44
45    /// Message sender for incoming messages
46    incoming_tx: mpsc::UnboundedSender<Message>,
47
48    /// Running state
49    running: Arc<tokio::sync::RwLock<bool>>,
50}
51
52impl NetworkManager {
53    /// Create a new network manager
54    pub fn new(config: Arc<ControllerConfig>) -> (Self, mpsc::UnboundedReceiver<Message>) {
55        // Build peer address map
56        let mut peer_addrs = HashMap::new();
57        for peer in &config.raft_peers {
58            peer_addrs.insert(peer.id, peer.addr);
59        }
60
61        // Create transport
62        let (transport, outgoing_rx, incoming_rx) =
63            RaftTransport::new(config.node_id, config.listen_addr, peer_addrs);
64
65        let incoming_tx = transport.message_sender();
66
67        let manager = Self {
68            config,
69            transport: Arc::new(transport),
70            outgoing_rx: Some(outgoing_rx),
71            incoming_tx,
72            running: Arc::new(tokio::sync::RwLock::new(false)),
73        };
74
75        (manager, incoming_rx)
76    }
77
78    /// Start the network manager
79    pub async fn start(&mut self) -> Result<()> {
80        let mut running = self.running.write().await;
81        if *running {
82            return Ok(());
83        }
84
85        info!("Starting network manager for node {}", self.config.node_id);
86
87        // Start transport
88        self.transport.clone().start().await?;
89
90        // Start outgoing message handler
91        if let Some(mut outgoing_rx) = self.outgoing_rx.take() {
92            let transport = self.transport.clone();
93            let running_clone = self.running.clone();
94
95            tokio::spawn(async move {
96                info!("Starting outgoing message handler");
97
98                while let Some(msg) = outgoing_rx.recv().await {
99                    if !*running_clone.read().await {
100                        break;
101                    }
102
103                    let to = msg.get_to();
104                    debug!(
105                        "Sending message to peer {}, type: {:?}",
106                        to,
107                        msg.get_msg_type()
108                    );
109
110                    if let Err(e) = transport.send_to_peer(to, msg).await {
111                        error!("Failed to send message to peer {}: {}", to, e);
112                    }
113                }
114
115                info!("Outgoing message handler stopped");
116            });
117        }
118
119        *running = true;
120        info!("Network manager started successfully");
121
122        Ok(())
123    }
124
125    /// Shutdown the network manager
126    pub async fn shutdown(&self) -> Result<()> {
127        let mut running = self.running.write().await;
128        if !*running {
129            return Ok(());
130        }
131
132        info!("Shutting down network manager");
133        *running = false;
134
135        Ok(())
136    }
137
138    /// Get the incoming message sender
139    pub fn incoming_sender(&self) -> mpsc::UnboundedSender<Message> {
140        self.incoming_tx.clone()
141    }
142
143    /// Send a message to a peer
144    pub async fn send_message(&self, msg: Message) -> Result<()> {
145        self.transport.send_to_peer(msg.get_to(), msg).await
146    }
147
148    /// Broadcast a message to all peers
149    pub async fn broadcast_message(&self, msg: Message) -> Result<()> {
150        self.transport.broadcast(msg).await
151    }
152}
153
154#[cfg(test)]
155mod tests {
156    use super::*;
157
158    #[tokio::test]
159    async fn test_network_manager_creation() {
160        let config = Arc::new(ControllerConfig::test_config());
161
162        let (manager, _rx) = NetworkManager::new(config);
163        assert!(!*manager.running.read().await);
164    }
165}