rocketmq_controller/metadata/
mod.rs1mod broker;
19mod config;
20mod replica;
21mod topic;
22
23use std::sync::Arc;
24
25pub use broker::BrokerInfo;
26pub use broker::BrokerManager;
27pub use broker::BrokerRole;
28pub use config::ConfigInfo;
29pub use config::ConfigManager;
30pub use replica::BrokerReplicaInfo;
31pub use replica::ReplicaRole;
32pub use replica::ReplicasManager;
33pub use replica::SyncStateSet;
34pub use topic::TopicConfig;
35pub use topic::TopicInfo;
36pub use topic::TopicManager;
37use tracing::info;
38
39use crate::config::ControllerConfig;
40use crate::error::Result;
41
42pub struct MetadataStore {
52 broker_manager: Arc<BrokerManager>,
54
55 topic_manager: Arc<TopicManager>,
57
58 config_manager: Arc<ConfigManager>,
60
61 replicas_manager: Arc<ReplicasManager>,
63}
64
65impl MetadataStore {
66 pub async fn new(config: Arc<ControllerConfig>) -> Result<Self> {
68 info!("Initializing metadata store");
69
70 let broker_manager = Arc::new(BrokerManager::new(config.clone()));
71 let topic_manager = Arc::new(TopicManager::new(config.clone()));
72 let config_manager = Arc::new(ConfigManager::new(config.clone()));
73 let replicas_manager = Arc::new(ReplicasManager::new(config));
74
75 Ok(Self {
76 broker_manager,
77 topic_manager,
78 config_manager,
79 replicas_manager,
80 })
81 }
82
83 pub async fn start(&self) -> Result<()> {
85 info!("Starting metadata store");
86 self.broker_manager.start().await?;
87 self.topic_manager.start().await?;
88 self.config_manager.start().await?;
89 self.replicas_manager.start().await?;
90 Ok(())
91 }
92
93 pub async fn shutdown(&self) -> Result<()> {
95 info!("Shutting down metadata store");
96 self.broker_manager.shutdown().await?;
97 self.topic_manager.shutdown().await?;
98 self.config_manager.shutdown().await?;
99 self.replicas_manager.shutdown().await?;
100 Ok(())
101 }
102
103 pub fn broker_manager(&self) -> &Arc<BrokerManager> {
105 &self.broker_manager
106 }
107
108 pub fn topic_manager(&self) -> &Arc<TopicManager> {
110 &self.topic_manager
111 }
112
113 pub fn config_manager(&self) -> &Arc<ConfigManager> {
115 &self.config_manager
116 }
117
118 pub fn replicas_manager(&self) -> &Arc<ReplicasManager> {
120 &self.replicas_manager
121 }
122}
123
124