rocketmq_controller/processor/
metadata_processor.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::sync::Arc;
19
20use serde_json::json;
21use tracing::info;
22
23use crate::error::ControllerError;
24use crate::error::Result;
25use crate::metadata::MetadataStore;
26use crate::processor::request::GetMetadataRequest;
27use crate::processor::request::GetMetadataResponse;
28use crate::processor::request::MetadataType;
29use crate::processor::RequestProcessor;
30
31/// Get metadata processor
32pub struct GetMetadataProcessor {
33    /// Metadata store
34    metadata: Arc<MetadataStore>,
35}
36
37impl GetMetadataProcessor {
38    /// Create a new get metadata processor
39    pub fn new(metadata: Arc<MetadataStore>) -> Self {
40        Self { metadata }
41    }
42
43    /// Process get metadata request
44    pub async fn process_request(
45        &self,
46        request: GetMetadataRequest,
47    ) -> Result<GetMetadataResponse> {
48        info!(
49            "Processing get metadata request, type: {:?}",
50            request.metadata_type
51        );
52
53        let (brokers, topics, configs) = match request.metadata_type {
54            MetadataType::Broker => {
55                let brokers = self.metadata.broker_manager().list_brokers().await;
56                (brokers, Vec::new(), json!({}))
57            }
58            MetadataType::Topic => {
59                let topics = self.metadata.topic_manager().list_topics().await;
60                (Vec::new(), topics, json!({}))
61            }
62            MetadataType::Config => {
63                let configs = self.metadata.config_manager().list_configs().await;
64                (Vec::new(), Vec::new(), json!(configs))
65            }
66            MetadataType::All => {
67                // Get all metadata
68                let brokers = self.metadata.broker_manager().list_brokers().await;
69                let topics = self.metadata.topic_manager().list_topics().await;
70                let configs = self.metadata.config_manager().list_configs().await;
71                (brokers, topics, json!(configs))
72            }
73        };
74
75        Ok(GetMetadataResponse {
76            success: true,
77            error: None,
78            brokers,
79            topics,
80            configs,
81        })
82    }
83}
84
85#[async_trait::async_trait]
86impl RequestProcessor for GetMetadataProcessor {
87    async fn process(&self, request: &[u8]) -> Result<Vec<u8>> {
88        let req: GetMetadataRequest = serde_json::from_slice(request)
89            .map_err(|e| ControllerError::InvalidRequest(e.to_string()))?;
90
91        let response = self.process_request(req).await?;
92
93        serde_json::to_vec(&response)
94            .map_err(|e| ControllerError::SerializationError(e.to_string()))
95    }
96}
97
98#[cfg(test)]
99mod tests {
100    use super::*;
101    use crate::config::ControllerConfig;
102
103    #[tokio::test]
104    async fn test_get_metadata_processor() {
105        let config = Arc::new(ControllerConfig::test_config());
106
107        let metadata = Arc::new(MetadataStore::new(config.clone()).await.unwrap());
108        let processor = GetMetadataProcessor::new(metadata);
109
110        let request = GetMetadataRequest {
111            metadata_type: MetadataType::All,
112            key: None,
113        };
114
115        let response = processor.process_request(request).await.unwrap();
116        assert!(response.success);
117        assert!(!response.brokers.is_empty() || response.brokers.is_empty());
118        assert!(!response.topics.is_empty() || response.topics.is_empty());
119    }
120}