rocketmq_controller/metadata/
mod.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18mod broker;
19mod config;
20mod replica;
21mod topic;
22
23use std::sync::Arc;
24
25pub use broker::BrokerInfo;
26pub use broker::BrokerManager;
27pub use broker::BrokerRole;
28pub use config::ConfigInfo;
29pub use config::ConfigManager;
30pub use replica::BrokerReplicaInfo;
31pub use replica::ReplicaRole;
32pub use replica::ReplicasManager;
33pub use replica::SyncStateSet;
34pub use topic::TopicConfig;
35pub use topic::TopicInfo;
36pub use topic::TopicManager;
37use tracing::info;
38
39use crate::config::ControllerConfig;
40use crate::error::Result;
41
42/// Metadata store
43///
44/// This component manages all metadata for the controller:
45/// - Broker registration and heartbeat
46/// - Topic configuration
47/// - Controller configuration
48/// - Replica and ISR management
49///
50/// All metadata is replicated through Raft for consistency.
51pub struct MetadataStore {
52    /// Broker manager
53    broker_manager: Arc<BrokerManager>,
54
55    /// Topic manager
56    topic_manager: Arc<TopicManager>,
57
58    /// Config manager
59    config_manager: Arc<ConfigManager>,
60
61    /// Replicas manager
62    replicas_manager: Arc<ReplicasManager>,
63}
64
65impl MetadataStore {
66    /// Create a new metadata store
67    pub async fn new(config: Arc<ControllerConfig>) -> Result<Self> {
68        info!("Initializing metadata store");
69
70        let broker_manager = Arc::new(BrokerManager::new(config.clone()));
71        let topic_manager = Arc::new(TopicManager::new(config.clone()));
72        let config_manager = Arc::new(ConfigManager::new(config.clone()));
73        let replicas_manager = Arc::new(ReplicasManager::new(config));
74
75        Ok(Self {
76            broker_manager,
77            topic_manager,
78            config_manager,
79            replicas_manager,
80        })
81    }
82
83    /// Start the metadata store
84    pub async fn start(&self) -> Result<()> {
85        info!("Starting metadata store");
86        self.broker_manager.start().await?;
87        self.topic_manager.start().await?;
88        self.config_manager.start().await?;
89        self.replicas_manager.start().await?;
90        Ok(())
91    }
92
93    /// Shutdown the metadata store
94    pub async fn shutdown(&self) -> Result<()> {
95        info!("Shutting down metadata store");
96        self.broker_manager.shutdown().await?;
97        self.topic_manager.shutdown().await?;
98        self.config_manager.shutdown().await?;
99        self.replicas_manager.shutdown().await?;
100        Ok(())
101    }
102
103    /// Get the broker manager
104    pub fn broker_manager(&self) -> &Arc<BrokerManager> {
105        &self.broker_manager
106    }
107
108    /// Get the topic manager
109    pub fn topic_manager(&self) -> &Arc<TopicManager> {
110        &self.topic_manager
111    }
112
113    /// Get the config manager
114    pub fn config_manager(&self) -> &Arc<ConfigManager> {
115        &self.config_manager
116    }
117
118    /// Get the replicas manager
119    pub fn replicas_manager(&self) -> &Arc<ReplicasManager> {
120        &self.replicas_manager
121    }
122}
123
124/*#[cfg(test)]
125mod tests {
126    use super::*;
127
128    #[tokio::test]
129    async fn test_metadata_store() {
130        // Placeholder test
131        assert!(true);
132    }
133}*/