rocketmq_controller/processor/
request.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::net::SocketAddr;
19
20use serde::Deserialize;
21use serde::Serialize;
22
23use crate::metadata::BrokerInfo;
24use crate::metadata::BrokerRole;
25use crate::metadata::TopicInfo;
26
27/// Request type enumeration
28#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
29pub enum RequestType {
30    RegisterBroker,
31    UnregisterBroker,
32    BrokerHeartbeat,
33    ElectMaster,
34    GetMetadata,
35    CreateTopic,
36    UpdateTopic,
37    DeleteTopic,
38    UpdateConfig,
39    GetConfig,
40}
41
42/// Register broker request
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct RegisterBrokerRequest {
45    /// Broker name
46    pub broker_name: String,
47
48    /// Broker ID
49    pub broker_id: u64,
50
51    /// Cluster name
52    pub cluster_name: String,
53
54    /// Broker address
55    pub broker_addr: SocketAddr,
56
57    /// Broker version
58    pub version: String,
59
60    /// Broker role
61    pub role: BrokerRole,
62
63    /// Additional metadata
64    pub metadata: serde_json::Value,
65}
66
67/// Register broker response
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct RegisterBrokerResponse {
70    /// Success flag
71    pub success: bool,
72
73    /// Error message if failed
74    pub error: Option<String>,
75
76    /// Assigned broker ID
77    pub broker_id: Option<u64>,
78}
79
80/// Unregister broker request
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct UnregisterBrokerRequest {
83    /// Broker name
84    pub broker_name: String,
85}
86
87/// Unregister broker response
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct UnregisterBrokerResponse {
90    /// Success flag
91    pub success: bool,
92
93    /// Error message if failed
94    pub error: Option<String>,
95}
96
97/// Broker heartbeat request
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct BrokerHeartbeatRequest {
100    /// Broker name
101    pub broker_name: String,
102
103    /// Timestamp
104    pub timestamp: u64,
105}
106
107/// Broker heartbeat response
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct BrokerHeartbeatResponse {
110    /// Success flag
111    pub success: bool,
112
113    /// Error message if failed
114    pub error: Option<String>,
115}
116
117/// Elect master request
118#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct ElectMasterRequest {
120    /// Cluster name
121    pub cluster_name: String,
122
123    /// Broker name
124    pub broker_name: String,
125}
126
127/// Elect master response
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct ElectMasterResponse {
130    /// Success flag
131    pub success: bool,
132
133    /// Error message if failed
134    pub error: Option<String>,
135
136    /// Master broker name
137    pub master_broker: Option<String>,
138
139    /// Master broker address
140    pub master_addr: Option<SocketAddr>,
141}
142
143/// Get metadata request
144#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct GetMetadataRequest {
146    /// Metadata type
147    pub metadata_type: MetadataType,
148
149    /// Filter key (optional)
150    pub key: Option<String>,
151}
152
153/// Metadata type
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub enum MetadataType {
156    Broker,
157    Topic,
158    Config,
159    All,
160}
161
162/// Get metadata response
163#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct GetMetadataResponse {
165    /// Success flag
166    pub success: bool,
167
168    /// Error message if failed
169    pub error: Option<String>,
170
171    /// Brokers
172    pub brokers: Vec<BrokerInfo>,
173
174    /// Topics
175    pub topics: Vec<TopicInfo>,
176
177    /// Configs
178    pub configs: serde_json::Value,
179}
180
181/// Create topic request
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct CreateTopicRequest {
184    /// Topic name
185    pub topic_name: String,
186
187    /// Read queue nums
188    pub read_queue_nums: u32,
189
190    /// Write queue nums
191    pub write_queue_nums: u32,
192
193    /// Permission
194    pub perm: u32,
195
196    /// Topic system flag
197    pub topic_sys_flag: u32,
198}
199
200/// Create topic response
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct CreateTopicResponse {
203    /// Success flag
204    pub success: bool,
205
206    /// Error message if failed
207    pub error: Option<String>,
208}
209
210/// Update topic request
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct UpdateTopicRequest {
213    /// Topic name
214    pub topic_name: String,
215
216    /// Topic info
217    pub topic_info: TopicInfo,
218}
219
220/// Update topic response
221#[derive(Debug, Clone, Serialize, Deserialize)]
222pub struct UpdateTopicResponse {
223    /// Success flag
224    pub success: bool,
225
226    /// Error message if failed
227    pub error: Option<String>,
228}
229
230/// Delete topic request
231#[derive(Debug, Clone, Serialize, Deserialize)]
232pub struct DeleteTopicRequest {
233    /// Topic name
234    pub topic_name: String,
235}
236
237/// Delete topic response
238#[derive(Debug, Clone, Serialize, Deserialize)]
239pub struct DeleteTopicResponse {
240    /// Success flag
241    pub success: bool,
242
243    /// Error message if failed
244    pub error: Option<String>,
245}
246
247#[cfg(test)]
248mod tests {
249    use super::*;
250
251    #[test]
252    fn test_request_serialization() {
253        let req = RegisterBrokerRequest {
254            broker_name: "broker-a".to_string(),
255            broker_id: 0,
256            cluster_name: "DefaultCluster".to_string(),
257            broker_addr: "127.0.0.1:10911".parse().unwrap(),
258            version: "5.0.0".to_string(),
259            role: BrokerRole::Master,
260            metadata: serde_json::json!({}),
261        };
262
263        let json = serde_json::to_string(&req).unwrap();
264        let deserialized: RegisterBrokerRequest = serde_json::from_str(&json).unwrap();
265        assert_eq!(req.broker_name, deserialized.broker_name);
266    }
267}