rocketmq_controller/processor/
metadata_processor.rs1use std::sync::Arc;
19
20use serde_json::json;
21use tracing::info;
22
23use crate::error::ControllerError;
24use crate::error::Result;
25use crate::metadata::MetadataStore;
26use crate::processor::request::GetMetadataRequest;
27use crate::processor::request::GetMetadataResponse;
28use crate::processor::request::MetadataType;
29use crate::processor::RequestProcessor;
30
31pub struct GetMetadataProcessor {
33 metadata: Arc<MetadataStore>,
35}
36
37impl GetMetadataProcessor {
38 pub fn new(metadata: Arc<MetadataStore>) -> Self {
40 Self { metadata }
41 }
42
43 pub async fn process_request(
45 &self,
46 request: GetMetadataRequest,
47 ) -> Result<GetMetadataResponse> {
48 info!(
49 "Processing get metadata request, type: {:?}",
50 request.metadata_type
51 );
52
53 let (brokers, topics, configs) = match request.metadata_type {
54 MetadataType::Broker => {
55 let brokers = self.metadata.broker_manager().list_brokers().await;
56 (brokers, Vec::new(), json!({}))
57 }
58 MetadataType::Topic => {
59 let topics = self.metadata.topic_manager().list_topics().await;
60 (Vec::new(), topics, json!({}))
61 }
62 MetadataType::Config => {
63 let configs = self.metadata.config_manager().list_configs().await;
64 (Vec::new(), Vec::new(), json!(configs))
65 }
66 MetadataType::All => {
67 let brokers = self.metadata.broker_manager().list_brokers().await;
69 let topics = self.metadata.topic_manager().list_topics().await;
70 let configs = self.metadata.config_manager().list_configs().await;
71 (brokers, topics, json!(configs))
72 }
73 };
74
75 Ok(GetMetadataResponse {
76 success: true,
77 error: None,
78 brokers,
79 topics,
80 configs,
81 })
82 }
83}
84
85#[async_trait::async_trait]
86impl RequestProcessor for GetMetadataProcessor {
87 async fn process(&self, request: &[u8]) -> Result<Vec<u8>> {
88 let req: GetMetadataRequest = serde_json::from_slice(request)
89 .map_err(|e| ControllerError::InvalidRequest(e.to_string()))?;
90
91 let response = self.process_request(req).await?;
92
93 serde_json::to_vec(&response)
94 .map_err(|e| ControllerError::SerializationError(e.to_string()))
95 }
96}
97
98#[cfg(test)]
99mod tests {
100 use super::*;
101 use crate::config::ControllerConfig;
102
103 #[tokio::test]
104 async fn test_get_metadata_processor() {
105 let config = Arc::new(ControllerConfig::test_config());
106
107 let metadata = Arc::new(MetadataStore::new(config.clone()).await.unwrap());
108 let processor = GetMetadataProcessor::new(metadata);
109
110 let request = GetMetadataRequest {
111 metadata_type: MetadataType::All,
112 key: None,
113 };
114
115 let response = processor.process_request(request).await.unwrap();
116 assert!(response.success);
117 assert!(!response.brokers.is_empty() || response.brokers.is_empty());
118 assert!(!response.topics.is_empty() || response.topics.is_empty());
119 }
120}