rocketmq_controller/raft/
network.rs1use std::collections::HashMap;
19use std::sync::Arc;
20
21use raft::prelude::Message;
22use tokio::sync::mpsc;
23use tracing::debug;
24use tracing::error;
25use tracing::info;
26
27use crate::config::ControllerConfig;
28use crate::error::Result;
29use crate::raft::RaftTransport;
30
31pub struct NetworkManager {
36 config: Arc<ControllerConfig>,
38
39 transport: Arc<RaftTransport>,
41
42 outgoing_rx: Option<mpsc::UnboundedReceiver<Message>>,
44
45 incoming_tx: mpsc::UnboundedSender<Message>,
47
48 running: Arc<tokio::sync::RwLock<bool>>,
50}
51
52impl NetworkManager {
53 pub fn new(config: Arc<ControllerConfig>) -> (Self, mpsc::UnboundedReceiver<Message>) {
55 let mut peer_addrs = HashMap::new();
57 for peer in &config.raft_peers {
58 peer_addrs.insert(peer.id, peer.addr);
59 }
60
61 let (transport, outgoing_rx, incoming_rx) =
63 RaftTransport::new(config.node_id, config.listen_addr, peer_addrs);
64
65 let incoming_tx = transport.message_sender();
66
67 let manager = Self {
68 config,
69 transport: Arc::new(transport),
70 outgoing_rx: Some(outgoing_rx),
71 incoming_tx,
72 running: Arc::new(tokio::sync::RwLock::new(false)),
73 };
74
75 (manager, incoming_rx)
76 }
77
78 pub async fn start(&mut self) -> Result<()> {
80 let mut running = self.running.write().await;
81 if *running {
82 return Ok(());
83 }
84
85 info!("Starting network manager for node {}", self.config.node_id);
86
87 self.transport.clone().start().await?;
89
90 if let Some(mut outgoing_rx) = self.outgoing_rx.take() {
92 let transport = self.transport.clone();
93 let running_clone = self.running.clone();
94
95 tokio::spawn(async move {
96 info!("Starting outgoing message handler");
97
98 while let Some(msg) = outgoing_rx.recv().await {
99 if !*running_clone.read().await {
100 break;
101 }
102
103 let to = msg.get_to();
104 debug!(
105 "Sending message to peer {}, type: {:?}",
106 to,
107 msg.get_msg_type()
108 );
109
110 if let Err(e) = transport.send_to_peer(to, msg).await {
111 error!("Failed to send message to peer {}: {}", to, e);
112 }
113 }
114
115 info!("Outgoing message handler stopped");
116 });
117 }
118
119 *running = true;
120 info!("Network manager started successfully");
121
122 Ok(())
123 }
124
125 pub async fn shutdown(&self) -> Result<()> {
127 let mut running = self.running.write().await;
128 if !*running {
129 return Ok(());
130 }
131
132 info!("Shutting down network manager");
133 *running = false;
134
135 Ok(())
136 }
137
138 pub fn incoming_sender(&self) -> mpsc::UnboundedSender<Message> {
140 self.incoming_tx.clone()
141 }
142
143 pub async fn send_message(&self, msg: Message) -> Result<()> {
145 self.transport.send_to_peer(msg.get_to(), msg).await
146 }
147
148 pub async fn broadcast_message(&self, msg: Message) -> Result<()> {
150 self.transport.broadcast(msg).await
151 }
152}
153
154#[cfg(test)]
155mod tests {
156 use super::*;
157
158 #[tokio::test]
159 async fn test_network_manager_creation() {
160 let config = Arc::new(ControllerConfig::test_config());
161
162 let (manager, _rx) = NetworkManager::new(config);
163 assert!(!*manager.running.read().await);
164 }
165}