rocketmq_controller/processor/
topic_processor.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::sync::Arc;
19
20use tracing::error;
21use tracing::info;
22
23use crate::error::ControllerError;
24use crate::error::Result;
25use crate::metadata::MetadataStore;
26use crate::metadata::TopicConfig;
27use crate::processor::request::CreateTopicRequest;
28use crate::processor::request::CreateTopicResponse;
29use crate::processor::request::DeleteTopicRequest;
30use crate::processor::request::DeleteTopicResponse;
31use crate::processor::request::UpdateTopicRequest;
32use crate::processor::request::UpdateTopicResponse;
33use crate::processor::RequestProcessor;
34use crate::raft::RaftController;
35
36/// Create topic processor
37pub struct CreateTopicProcessor {
38    /// Metadata store
39    metadata: Arc<MetadataStore>,
40
41    /// Raft controller
42    raft: Arc<RaftController>,
43}
44
45impl CreateTopicProcessor {
46    /// Create a new create topic processor
47    pub fn new(metadata: Arc<MetadataStore>, raft: Arc<RaftController>) -> Self {
48        Self { metadata, raft }
49    }
50
51    /// Process create topic request
52    pub async fn process_request(
53        &self,
54        request: CreateTopicRequest,
55    ) -> Result<CreateTopicResponse> {
56        info!("Processing create topic request: {}", request.topic_name);
57
58        // Check if we are the leader
59        if !self.raft.is_leader().await {
60            let leader = self.raft.get_leader().await;
61            error!("Not leader, current leader: {:?}", leader);
62            return Ok(CreateTopicResponse {
63                success: false,
64                error: Some(format!("Not leader, current leader: {:?}", leader)),
65            });
66        }
67
68        // Validate request
69        if request.read_queue_nums == 0 || request.write_queue_nums == 0 {
70            return Ok(CreateTopicResponse {
71                success: false,
72                error: Some("Queue nums must be greater than 0".to_string()),
73            });
74        }
75
76        // Create topic config
77        let config = TopicConfig {
78            topic_name: request.topic_name.clone(),
79            read_queue_nums: request.read_queue_nums,
80            write_queue_nums: request.write_queue_nums,
81            perm: request.perm,
82            topic_filter_type: 0,
83            topic_sys_flag: request.topic_sys_flag,
84            order: false,
85            attributes: serde_json::Value::Null,
86        };
87
88        // Create topic
89        match self.metadata.topic_manager().create_topic(config).await {
90            Ok(()) => {
91                info!("Successfully created topic: {}", request.topic_name);
92                Ok(CreateTopicResponse {
93                    success: true,
94                    error: None,
95                })
96            }
97            Err(e) => {
98                error!("Failed to create topic {}: {}", request.topic_name, e);
99                Ok(CreateTopicResponse {
100                    success: false,
101                    error: Some(e.to_string()),
102                })
103            }
104        }
105    }
106}
107
108#[async_trait::async_trait]
109impl RequestProcessor for CreateTopicProcessor {
110    async fn process(&self, request: &[u8]) -> Result<Vec<u8>> {
111        let req: CreateTopicRequest = serde_json::from_slice(request)
112            .map_err(|e| ControllerError::InvalidRequest(e.to_string()))?;
113
114        let response = self.process_request(req).await?;
115
116        serde_json::to_vec(&response)
117            .map_err(|e| ControllerError::SerializationError(e.to_string()))
118    }
119}
120
121/// Update topic processor
122pub struct UpdateTopicProcessor {
123    /// Metadata store
124    metadata: Arc<MetadataStore>,
125
126    /// Raft controller
127    raft: Arc<RaftController>,
128}
129
130impl UpdateTopicProcessor {
131    /// Create a new update topic processor
132    pub fn new(metadata: Arc<MetadataStore>, raft: Arc<RaftController>) -> Self {
133        Self { metadata, raft }
134    }
135
136    /// Process update topic request
137    pub async fn process_request(
138        &self,
139        request: UpdateTopicRequest,
140    ) -> Result<UpdateTopicResponse> {
141        info!("Processing update topic request: {}", request.topic_name);
142
143        // Check if we are the leader
144        if !self.raft.is_leader().await {
145            let leader = self.raft.get_leader().await;
146            return Ok(UpdateTopicResponse {
147                success: false,
148                error: Some(format!("Not leader, current leader: {:?}", leader)),
149            });
150        }
151
152        // Create updated config
153        let config = TopicConfig {
154            topic_name: request.topic_name.clone(),
155            read_queue_nums: request.topic_info.read_queue_nums,
156            write_queue_nums: request.topic_info.write_queue_nums,
157            perm: request.topic_info.perm,
158            topic_filter_type: 0,
159            topic_sys_flag: request.topic_info.topic_sys_flag,
160            order: false,
161            attributes: request.topic_info.metadata.clone(),
162        };
163
164        // Update topic
165        match self.metadata.topic_manager().update_topic(config).await {
166            Ok(()) => {
167                info!("Successfully updated topic: {}", request.topic_name);
168                Ok(UpdateTopicResponse {
169                    success: true,
170                    error: None,
171                })
172            }
173            Err(e) => {
174                error!("Failed to update topic {}: {}", request.topic_name, e);
175                Ok(UpdateTopicResponse {
176                    success: false,
177                    error: Some(e.to_string()),
178                })
179            }
180        }
181    }
182}
183
184#[async_trait::async_trait]
185impl RequestProcessor for UpdateTopicProcessor {
186    async fn process(&self, request: &[u8]) -> Result<Vec<u8>> {
187        let req: UpdateTopicRequest = serde_json::from_slice(request)
188            .map_err(|e| ControllerError::InvalidRequest(e.to_string()))?;
189
190        let response = self.process_request(req).await?;
191
192        serde_json::to_vec(&response)
193            .map_err(|e| ControllerError::SerializationError(e.to_string()))
194    }
195}
196
197/// Delete topic processor
198pub struct DeleteTopicProcessor {
199    /// Metadata store
200    metadata: Arc<MetadataStore>,
201
202    /// Raft controller
203    raft: Arc<RaftController>,
204}
205
206impl DeleteTopicProcessor {
207    /// Create a new delete topic processor
208    pub fn new(metadata: Arc<MetadataStore>, raft: Arc<RaftController>) -> Self {
209        Self { metadata, raft }
210    }
211
212    /// Process delete topic request
213    pub async fn process_request(
214        &self,
215        request: DeleteTopicRequest,
216    ) -> Result<DeleteTopicResponse> {
217        info!("Processing delete topic request: {}", request.topic_name);
218
219        // Check if we are the leader
220        if !self.raft.is_leader().await {
221            let leader = self.raft.get_leader().await;
222            return Ok(DeleteTopicResponse {
223                success: false,
224                error: Some(format!("Not leader, current leader: {:?}", leader)),
225            });
226        }
227
228        // Delete topic
229        match self
230            .metadata
231            .topic_manager()
232            .delete_topic(&request.topic_name)
233            .await
234        {
235            Ok(()) => {
236                info!("Successfully deleted topic: {}", request.topic_name);
237                Ok(DeleteTopicResponse {
238                    success: true,
239                    error: None,
240                })
241            }
242            Err(e) => {
243                error!("Failed to delete topic {}: {}", request.topic_name, e);
244                Ok(DeleteTopicResponse {
245                    success: false,
246                    error: Some(e.to_string()),
247                })
248            }
249        }
250    }
251}
252
253#[async_trait::async_trait]
254impl RequestProcessor for DeleteTopicProcessor {
255    async fn process(&self, request: &[u8]) -> Result<Vec<u8>> {
256        let req: DeleteTopicRequest = serde_json::from_slice(request)
257            .map_err(|e| ControllerError::InvalidRequest(e.to_string()))?;
258
259        let response = self.process_request(req).await?;
260
261        serde_json::to_vec(&response)
262            .map_err(|e| ControllerError::SerializationError(e.to_string()))
263    }
264}