rocketmq_controller/processor/
mod.rs1pub mod broker_processor;
19pub mod metadata_processor;
20pub mod request;
21pub mod topic_processor;
22
23use std::collections::HashMap;
24use std::sync::Arc;
25
26pub use broker_processor::{
28 BrokerHeartbeatProcessor, ElectMasterProcessor, RegisterBrokerProcessor,
29 UnregisterBrokerProcessor,
30};
31pub use metadata_processor::GetMetadataProcessor;
32pub use request::RequestType;
33pub use request::*;
34pub use topic_processor::CreateTopicProcessor;
35pub use topic_processor::DeleteTopicProcessor;
36pub use topic_processor::UpdateTopicProcessor;
37use tracing::info;
38
39use crate::config::ControllerConfig;
40use crate::error::ControllerError;
41use crate::error::Result;
42use crate::metadata::MetadataStore;
43use crate::raft::RaftController;
44
45#[async_trait::async_trait]
47pub trait RequestProcessor: Send + Sync {
48 async fn process(&self, request: &[u8]) -> Result<Vec<u8>>;
50}
51
52pub struct ProcessorManager {
57 config: Arc<ControllerConfig>,
59
60 raft: Arc<RaftController>,
62
63 metadata: Arc<MetadataStore>,
65
66 processors: HashMap<RequestType, Arc<dyn RequestProcessor>>,
68}
69
70impl ProcessorManager {
71 pub fn new(
73 config: Arc<ControllerConfig>,
74 raft: Arc<RaftController>,
75 metadata: Arc<MetadataStore>,
76 ) -> Self {
77 let mut processors: HashMap<RequestType, Arc<dyn RequestProcessor>> = HashMap::new();
79
80 processors.insert(
82 RequestType::RegisterBroker,
83 Arc::new(RegisterBrokerProcessor::new(metadata.clone(), raft.clone())),
84 );
85 processors.insert(
86 RequestType::UnregisterBroker,
87 Arc::new(UnregisterBrokerProcessor::new(
88 metadata.clone(),
89 raft.clone(),
90 )),
91 );
92 processors.insert(
93 RequestType::BrokerHeartbeat,
94 Arc::new(BrokerHeartbeatProcessor::new(metadata.clone())),
95 );
96 processors.insert(
97 RequestType::ElectMaster,
98 Arc::new(ElectMasterProcessor::new(metadata.clone(), raft.clone())),
99 );
100
101 processors.insert(
103 RequestType::GetMetadata,
104 Arc::new(GetMetadataProcessor::new(metadata.clone())),
105 );
106
107 processors.insert(
109 RequestType::CreateTopic,
110 Arc::new(CreateTopicProcessor::new(metadata.clone(), raft.clone())),
111 );
112 processors.insert(
113 RequestType::UpdateTopic,
114 Arc::new(UpdateTopicProcessor::new(metadata.clone(), raft.clone())),
115 );
116 processors.insert(
117 RequestType::DeleteTopic,
118 Arc::new(DeleteTopicProcessor::new(metadata.clone(), raft.clone())),
119 );
120
121 Self {
122 config,
123 raft,
124 metadata,
125 processors,
126 }
127 }
128
129 pub async fn process_request(&self, request_type: RequestType, data: &[u8]) -> Result<Vec<u8>> {
131 let processor = self.processors.get(&request_type).ok_or_else(|| {
133 ControllerError::InvalidRequest(format!("Unknown request type: {:?}", request_type))
134 })?;
135
136 processor.process(data).await
138 }
139
140 pub async fn start(&self) -> Result<()> {
142 info!(
143 "Starting processor manager with {} processors",
144 self.processors.len()
145 );
146 Ok(())
148 }
149
150 pub async fn shutdown(&self) -> Result<()> {
152 info!("Shutting down processor manager");
153 Ok(())
155 }
156}
157
158