rocketmq_controller/processor/
mod.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18pub mod broker_processor;
19pub mod metadata_processor;
20pub mod request;
21pub mod topic_processor;
22
23use std::collections::HashMap;
24use std::sync::Arc;
25
26// Re-export processors
27pub use broker_processor::{
28    BrokerHeartbeatProcessor, ElectMasterProcessor, RegisterBrokerProcessor,
29    UnregisterBrokerProcessor,
30};
31pub use metadata_processor::GetMetadataProcessor;
32pub use request::RequestType;
33pub use request::*;
34pub use topic_processor::CreateTopicProcessor;
35pub use topic_processor::DeleteTopicProcessor;
36pub use topic_processor::UpdateTopicProcessor;
37use tracing::info;
38
39use crate::config::ControllerConfig;
40use crate::error::ControllerError;
41use crate::error::Result;
42use crate::metadata::MetadataStore;
43use crate::raft::RaftController;
44
45/// Request processor trait
46#[async_trait::async_trait]
47pub trait RequestProcessor: Send + Sync {
48    /// Process a request
49    async fn process(&self, request: &[u8]) -> Result<Vec<u8>>;
50}
51
52/// Processor manager
53///
54/// This component manages all request processors for handling
55/// RPC requests from brokers and clients.
56pub struct ProcessorManager {
57    /// Configuration
58    config: Arc<ControllerConfig>,
59
60    /// Raft controller
61    raft: Arc<RaftController>,
62
63    /// Metadata store
64    metadata: Arc<MetadataStore>,
65
66    /// Processor registry
67    processors: HashMap<RequestType, Arc<dyn RequestProcessor>>,
68}
69
70impl ProcessorManager {
71    /// Create a new processor manager
72    pub fn new(
73        config: Arc<ControllerConfig>,
74        raft: Arc<RaftController>,
75        metadata: Arc<MetadataStore>,
76    ) -> Self {
77        // Initialize processors
78        let mut processors: HashMap<RequestType, Arc<dyn RequestProcessor>> = HashMap::new();
79
80        // Register broker processors
81        processors.insert(
82            RequestType::RegisterBroker,
83            Arc::new(RegisterBrokerProcessor::new(metadata.clone(), raft.clone())),
84        );
85        processors.insert(
86            RequestType::UnregisterBroker,
87            Arc::new(UnregisterBrokerProcessor::new(
88                metadata.clone(),
89                raft.clone(),
90            )),
91        );
92        processors.insert(
93            RequestType::BrokerHeartbeat,
94            Arc::new(BrokerHeartbeatProcessor::new(metadata.clone())),
95        );
96        processors.insert(
97            RequestType::ElectMaster,
98            Arc::new(ElectMasterProcessor::new(metadata.clone(), raft.clone())),
99        );
100
101        // Register metadata processor
102        processors.insert(
103            RequestType::GetMetadata,
104            Arc::new(GetMetadataProcessor::new(metadata.clone())),
105        );
106
107        // Register topic processors
108        processors.insert(
109            RequestType::CreateTopic,
110            Arc::new(CreateTopicProcessor::new(metadata.clone(), raft.clone())),
111        );
112        processors.insert(
113            RequestType::UpdateTopic,
114            Arc::new(UpdateTopicProcessor::new(metadata.clone(), raft.clone())),
115        );
116        processors.insert(
117            RequestType::DeleteTopic,
118            Arc::new(DeleteTopicProcessor::new(metadata.clone(), raft.clone())),
119        );
120
121        Self {
122            config,
123            raft,
124            metadata,
125            processors,
126        }
127    }
128
129    /// Process a request
130    pub async fn process_request(&self, request_type: RequestType, data: &[u8]) -> Result<Vec<u8>> {
131        // Find the processor
132        let processor = self.processors.get(&request_type).ok_or_else(|| {
133            ControllerError::InvalidRequest(format!("Unknown request type: {:?}", request_type))
134        })?;
135
136        // Process the request
137        processor.process(data).await
138    }
139
140    /// Start the processor manager
141    pub async fn start(&self) -> Result<()> {
142        info!(
143            "Starting processor manager with {} processors",
144            self.processors.len()
145        );
146        // TODO: Start network server to handle incoming requests
147        Ok(())
148    }
149
150    /// Shutdown the processor manager
151    pub async fn shutdown(&self) -> Result<()> {
152        info!("Shutting down processor manager");
153        // TODO: Stop network server and cleanup
154        Ok(())
155    }
156}
157
158/*#[cfg(test)]
159mod tests {
160    #[tokio::test]
161    async fn test_processor_manager() {
162        // Placeholder test
163        assert!(true);
164    }
165}*/