rocketmq_controller/
manager.rs1use std::sync::Arc;
19
20use tokio::sync::RwLock;
21use tracing::error;
22use tracing::info;
23use tracing::warn;
24
25use crate::config::ControllerConfig;
26use crate::error::Result;
27use crate::metadata::MetadataStore;
28use crate::processor::ProcessorManager;
29use crate::raft::RaftController;
30use crate::rpc::RpcServer;
31
32pub struct ControllerManager {
42 config: Arc<ControllerConfig>,
44
45 raft: Arc<RaftController>,
47
48 metadata: Arc<MetadataStore>,
50
51 processor: Arc<ProcessorManager>,
53
54 rpc_server: Arc<RpcServer>,
56
57 running: Arc<RwLock<bool>>,
59}
60
61impl ControllerManager {
62 pub async fn new(config: ControllerConfig) -> Result<Self> {
64 let config = Arc::new(config);
65
66 info!("Initializing controller manager with config: {:?}", config);
67
68 let raft = Arc::new(RaftController::new(config.clone()).await?);
70
71 let metadata = Arc::new(MetadataStore::new(config.clone()).await?);
73
74 let processor = Arc::new(ProcessorManager::new(
76 config.clone(),
77 raft.clone(),
78 metadata.clone(),
79 ));
80
81 let rpc_server = Arc::new(RpcServer::new(config.listen_addr, processor.clone()));
83
84 Ok(Self {
85 config,
86 raft,
87 metadata,
88 processor,
89 rpc_server,
90 running: Arc::new(RwLock::new(false)),
91 })
92 }
93
94 pub async fn start(&self) -> Result<()> {
96 let mut running = self.running.write().await;
97 if *running {
98 warn!("Controller is already running");
99 return Ok(());
100 }
101
102 info!("Starting controller manager...");
103
104 self.raft.start().await?;
106
107 self.metadata.start().await?;
109
110 self.processor.start().await?;
112
113 self.rpc_server.start().await?;
115
116 *running = true;
117 info!("Controller manager started successfully");
118
119 Ok(())
120 }
121
122 pub async fn shutdown(&self) -> Result<()> {
124 let mut running = self.running.write().await;
125 if !*running {
126 warn!("Controller is not running");
127 return Ok(());
128 }
129
130 info!("Shutting down controller manager...");
131
132 if let Err(e) = self.rpc_server.shutdown().await {
134 error!("Failed to shutdown RPC server: {}", e);
135 }
136
137 if let Err(e) = self.processor.shutdown().await {
139 error!("Failed to shutdown processor: {}", e);
140 }
141
142 if let Err(e) = self.metadata.shutdown().await {
144 error!("Failed to shutdown metadata store: {}", e);
145 }
146
147 if let Err(e) = self.raft.shutdown().await {
149 error!("Failed to shutdown Raft controller: {}", e);
150 }
151
152 *running = false;
153 info!("Controller manager shut down successfully");
154
155 Ok(())
156 }
157
158 pub async fn is_leader(&self) -> bool {
160 self.raft.is_leader().await
161 }
162
163 pub async fn get_leader(&self) -> Option<u64> {
165 self.raft.get_leader().await
166 }
167
168 pub async fn is_running(&self) -> bool {
170 *self.running.read().await
171 }
172
173 pub fn raft(&self) -> &Arc<RaftController> {
175 &self.raft
176 }
177
178 pub fn metadata(&self) -> &Arc<MetadataStore> {
180 &self.metadata
181 }
182
183 pub fn processor(&self) -> &Arc<ProcessorManager> {
185 &self.processor
186 }
187}
188
189impl Drop for ControllerManager {
190 fn drop(&mut self) {
191 let running = self.running.clone();
193 let processor = self.processor.clone();
194 let metadata = self.metadata.clone();
195 let raft = self.raft.clone();
196
197 tokio::spawn(async move {
198 let is_running = *running.read().await;
199 if is_running {
200 warn!("Controller manager dropped while running, performing emergency shutdown");
201 let _ = processor.shutdown().await;
202 let _ = metadata.shutdown().await;
203 let _ = raft.shutdown().await;
204 }
205 });
206 }
207}
208
209