rocketmq_controller/
manager.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::sync::Arc;
19
20use tokio::sync::RwLock;
21use tracing::error;
22use tracing::info;
23use tracing::warn;
24
25use crate::config::ControllerConfig;
26use crate::error::Result;
27use crate::metadata::MetadataStore;
28use crate::processor::ProcessorManager;
29use crate::raft::RaftController;
30use crate::rpc::RpcServer;
31
32/// Main controller manager
33///
34/// This is the central component that coordinates all controller operations.
35/// It manages:
36/// - Raft consensus layer
37/// - Metadata storage
38/// - Request processing
39/// - RPC server
40/// - Lifecycle management
41pub struct ControllerManager {
42    /// Configuration
43    config: Arc<ControllerConfig>,
44
45    /// Raft controller
46    raft: Arc<RaftController>,
47
48    /// Metadata store
49    metadata: Arc<MetadataStore>,
50
51    /// Request processor
52    processor: Arc<ProcessorManager>,
53
54    /// RPC server
55    rpc_server: Arc<RpcServer>,
56
57    /// Running state
58    running: Arc<RwLock<bool>>,
59}
60
61impl ControllerManager {
62    /// Create a new controller manager
63    pub async fn new(config: ControllerConfig) -> Result<Self> {
64        let config = Arc::new(config);
65
66        info!("Initializing controller manager with config: {:?}", config);
67
68        // Initialize Raft controller
69        let raft = Arc::new(RaftController::new(config.clone()).await?);
70
71        // Initialize metadata store
72        let metadata = Arc::new(MetadataStore::new(config.clone()).await?);
73
74        // Initialize processor manager
75        let processor = Arc::new(ProcessorManager::new(
76            config.clone(),
77            raft.clone(),
78            metadata.clone(),
79        ));
80
81        // Initialize RPC server
82        let rpc_server = Arc::new(RpcServer::new(config.listen_addr, processor.clone()));
83
84        Ok(Self {
85            config,
86            raft,
87            metadata,
88            processor,
89            rpc_server,
90            running: Arc::new(RwLock::new(false)),
91        })
92    }
93
94    /// Start the controller
95    pub async fn start(&self) -> Result<()> {
96        let mut running = self.running.write().await;
97        if *running {
98            warn!("Controller is already running");
99            return Ok(());
100        }
101
102        info!("Starting controller manager...");
103
104        // Start Raft controller
105        self.raft.start().await?;
106
107        // Start metadata store
108        self.metadata.start().await?;
109
110        // Start processor manager
111        self.processor.start().await?;
112
113        // Start RPC server
114        self.rpc_server.start().await?;
115
116        *running = true;
117        info!("Controller manager started successfully");
118
119        Ok(())
120    }
121
122    /// Shutdown the controller
123    pub async fn shutdown(&self) -> Result<()> {
124        let mut running = self.running.write().await;
125        if !*running {
126            warn!("Controller is not running");
127            return Ok(());
128        }
129
130        info!("Shutting down controller manager...");
131
132        // Shutdown RPC server first to stop accepting requests
133        if let Err(e) = self.rpc_server.shutdown().await {
134            error!("Failed to shutdown RPC server: {}", e);
135        }
136
137        // Shutdown processor
138        if let Err(e) = self.processor.shutdown().await {
139            error!("Failed to shutdown processor: {}", e);
140        }
141
142        // Shutdown metadata store
143        if let Err(e) = self.metadata.shutdown().await {
144            error!("Failed to shutdown metadata store: {}", e);
145        }
146
147        // Shutdown Raft last
148        if let Err(e) = self.raft.shutdown().await {
149            error!("Failed to shutdown Raft controller: {}", e);
150        }
151
152        *running = false;
153        info!("Controller manager shut down successfully");
154
155        Ok(())
156    }
157
158    /// Check if this node is the leader
159    pub async fn is_leader(&self) -> bool {
160        self.raft.is_leader().await
161    }
162
163    /// Get the current leader ID
164    pub async fn get_leader(&self) -> Option<u64> {
165        self.raft.get_leader().await
166    }
167
168    /// Check if the controller is running
169    pub async fn is_running(&self) -> bool {
170        *self.running.read().await
171    }
172
173    /// Get the Raft controller
174    pub fn raft(&self) -> &Arc<RaftController> {
175        &self.raft
176    }
177
178    /// Get the metadata store
179    pub fn metadata(&self) -> &Arc<MetadataStore> {
180        &self.metadata
181    }
182
183    /// Get the processor manager
184    pub fn processor(&self) -> &Arc<ProcessorManager> {
185        &self.processor
186    }
187}
188
189impl Drop for ControllerManager {
190    fn drop(&mut self) {
191        // Best effort shutdown on drop
192        let running = self.running.clone();
193        let processor = self.processor.clone();
194        let metadata = self.metadata.clone();
195        let raft = self.raft.clone();
196
197        tokio::spawn(async move {
198            let is_running = *running.read().await;
199            if is_running {
200                warn!("Controller manager dropped while running, performing emergency shutdown");
201                let _ = processor.shutdown().await;
202                let _ = metadata.shutdown().await;
203                let _ = raft.shutdown().await;
204            }
205        });
206    }
207}
208
209/*#[cfg(test)]
210mod tests {
211    #[tokio::test]
212    async fn test_manager_lifecycle() {
213        // This is a placeholder test
214        // Real tests will be added after implementing the dependencies
215        assert!(true);
216    }
217}
218*/