rocketmq_controller/processor/
topic_processor.rs1use std::sync::Arc;
19
20use tracing::error;
21use tracing::info;
22
23use crate::error::ControllerError;
24use crate::error::Result;
25use crate::metadata::MetadataStore;
26use crate::metadata::TopicConfig;
27use crate::processor::request::CreateTopicRequest;
28use crate::processor::request::CreateTopicResponse;
29use crate::processor::request::DeleteTopicRequest;
30use crate::processor::request::DeleteTopicResponse;
31use crate::processor::request::UpdateTopicRequest;
32use crate::processor::request::UpdateTopicResponse;
33use crate::processor::RequestProcessor;
34use crate::raft::RaftController;
35
36pub struct CreateTopicProcessor {
38 metadata: Arc<MetadataStore>,
40
41 raft: Arc<RaftController>,
43}
44
45impl CreateTopicProcessor {
46 pub fn new(metadata: Arc<MetadataStore>, raft: Arc<RaftController>) -> Self {
48 Self { metadata, raft }
49 }
50
51 pub async fn process_request(
53 &self,
54 request: CreateTopicRequest,
55 ) -> Result<CreateTopicResponse> {
56 info!("Processing create topic request: {}", request.topic_name);
57
58 if !self.raft.is_leader().await {
60 let leader = self.raft.get_leader().await;
61 error!("Not leader, current leader: {:?}", leader);
62 return Ok(CreateTopicResponse {
63 success: false,
64 error: Some(format!("Not leader, current leader: {:?}", leader)),
65 });
66 }
67
68 if request.read_queue_nums == 0 || request.write_queue_nums == 0 {
70 return Ok(CreateTopicResponse {
71 success: false,
72 error: Some("Queue nums must be greater than 0".to_string()),
73 });
74 }
75
76 let config = TopicConfig {
78 topic_name: request.topic_name.clone(),
79 read_queue_nums: request.read_queue_nums,
80 write_queue_nums: request.write_queue_nums,
81 perm: request.perm,
82 topic_filter_type: 0,
83 topic_sys_flag: request.topic_sys_flag,
84 order: false,
85 attributes: serde_json::Value::Null,
86 };
87
88 match self.metadata.topic_manager().create_topic(config).await {
90 Ok(()) => {
91 info!("Successfully created topic: {}", request.topic_name);
92 Ok(CreateTopicResponse {
93 success: true,
94 error: None,
95 })
96 }
97 Err(e) => {
98 error!("Failed to create topic {}: {}", request.topic_name, e);
99 Ok(CreateTopicResponse {
100 success: false,
101 error: Some(e.to_string()),
102 })
103 }
104 }
105 }
106}
107
108#[async_trait::async_trait]
109impl RequestProcessor for CreateTopicProcessor {
110 async fn process(&self, request: &[u8]) -> Result<Vec<u8>> {
111 let req: CreateTopicRequest = serde_json::from_slice(request)
112 .map_err(|e| ControllerError::InvalidRequest(e.to_string()))?;
113
114 let response = self.process_request(req).await?;
115
116 serde_json::to_vec(&response)
117 .map_err(|e| ControllerError::SerializationError(e.to_string()))
118 }
119}
120
121pub struct UpdateTopicProcessor {
123 metadata: Arc<MetadataStore>,
125
126 raft: Arc<RaftController>,
128}
129
130impl UpdateTopicProcessor {
131 pub fn new(metadata: Arc<MetadataStore>, raft: Arc<RaftController>) -> Self {
133 Self { metadata, raft }
134 }
135
136 pub async fn process_request(
138 &self,
139 request: UpdateTopicRequest,
140 ) -> Result<UpdateTopicResponse> {
141 info!("Processing update topic request: {}", request.topic_name);
142
143 if !self.raft.is_leader().await {
145 let leader = self.raft.get_leader().await;
146 return Ok(UpdateTopicResponse {
147 success: false,
148 error: Some(format!("Not leader, current leader: {:?}", leader)),
149 });
150 }
151
152 let config = TopicConfig {
154 topic_name: request.topic_name.clone(),
155 read_queue_nums: request.topic_info.read_queue_nums,
156 write_queue_nums: request.topic_info.write_queue_nums,
157 perm: request.topic_info.perm,
158 topic_filter_type: 0,
159 topic_sys_flag: request.topic_info.topic_sys_flag,
160 order: false,
161 attributes: request.topic_info.metadata.clone(),
162 };
163
164 match self.metadata.topic_manager().update_topic(config).await {
166 Ok(()) => {
167 info!("Successfully updated topic: {}", request.topic_name);
168 Ok(UpdateTopicResponse {
169 success: true,
170 error: None,
171 })
172 }
173 Err(e) => {
174 error!("Failed to update topic {}: {}", request.topic_name, e);
175 Ok(UpdateTopicResponse {
176 success: false,
177 error: Some(e.to_string()),
178 })
179 }
180 }
181 }
182}
183
184#[async_trait::async_trait]
185impl RequestProcessor for UpdateTopicProcessor {
186 async fn process(&self, request: &[u8]) -> Result<Vec<u8>> {
187 let req: UpdateTopicRequest = serde_json::from_slice(request)
188 .map_err(|e| ControllerError::InvalidRequest(e.to_string()))?;
189
190 let response = self.process_request(req).await?;
191
192 serde_json::to_vec(&response)
193 .map_err(|e| ControllerError::SerializationError(e.to_string()))
194 }
195}
196
197pub struct DeleteTopicProcessor {
199 metadata: Arc<MetadataStore>,
201
202 raft: Arc<RaftController>,
204}
205
206impl DeleteTopicProcessor {
207 pub fn new(metadata: Arc<MetadataStore>, raft: Arc<RaftController>) -> Self {
209 Self { metadata, raft }
210 }
211
212 pub async fn process_request(
214 &self,
215 request: DeleteTopicRequest,
216 ) -> Result<DeleteTopicResponse> {
217 info!("Processing delete topic request: {}", request.topic_name);
218
219 if !self.raft.is_leader().await {
221 let leader = self.raft.get_leader().await;
222 return Ok(DeleteTopicResponse {
223 success: false,
224 error: Some(format!("Not leader, current leader: {:?}", leader)),
225 });
226 }
227
228 match self
230 .metadata
231 .topic_manager()
232 .delete_topic(&request.topic_name)
233 .await
234 {
235 Ok(()) => {
236 info!("Successfully deleted topic: {}", request.topic_name);
237 Ok(DeleteTopicResponse {
238 success: true,
239 error: None,
240 })
241 }
242 Err(e) => {
243 error!("Failed to delete topic {}: {}", request.topic_name, e);
244 Ok(DeleteTopicResponse {
245 success: false,
246 error: Some(e.to_string()),
247 })
248 }
249 }
250 }
251}
252
253#[async_trait::async_trait]
254impl RequestProcessor for DeleteTopicProcessor {
255 async fn process(&self, request: &[u8]) -> Result<Vec<u8>> {
256 let req: DeleteTopicRequest = serde_json::from_slice(request)
257 .map_err(|e| ControllerError::InvalidRequest(e.to_string()))?;
258
259 let response = self.process_request(req).await?;
260
261 serde_json::to_vec(&response)
262 .map_err(|e| ControllerError::SerializationError(e.to_string()))
263 }
264}