iggy_common/commands/topics/
create_topic.rs1use super::{MAX_NAME_LENGTH, MAX_PARTITIONS_COUNT};
20use crate::BytesSerializable;
21use crate::CompressionAlgorithm;
22use crate::Identifier;
23use crate::Sizeable;
24use crate::Validatable;
25use crate::error::IggyError;
26use crate::utils::expiry::IggyExpiry;
27use crate::utils::topic_size::MaxTopicSize;
28use crate::{CREATE_TOPIC_CODE, Command};
29use bytes::{BufMut, Bytes, BytesMut};
30use serde::{Deserialize, Serialize};
31use std::fmt::Display;
32use std::str::from_utf8;
33
34#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
44pub struct CreateTopic {
45 #[serde(skip)]
47 pub stream_id: Identifier,
48 pub partitions_count: u32,
50 pub compression_algorithm: CompressionAlgorithm,
52 pub message_expiry: IggyExpiry,
54 pub max_topic_size: MaxTopicSize,
57 pub replication_factor: Option<u8>,
59 pub name: String,
61}
62
63impl Command for CreateTopic {
64 fn code(&self) -> u32 {
65 CREATE_TOPIC_CODE
66 }
67}
68
69impl Default for CreateTopic {
70 fn default() -> Self {
71 CreateTopic {
72 stream_id: Identifier::default(),
73 partitions_count: 1,
74 compression_algorithm: CompressionAlgorithm::None,
75 message_expiry: IggyExpiry::NeverExpire,
76 max_topic_size: MaxTopicSize::ServerDefault,
77 replication_factor: None,
78 name: "topic".to_string(),
79 }
80 }
81}
82
83impl Validatable<IggyError> for CreateTopic {
84 fn validate(&self) -> Result<(), IggyError> {
85 if self.name.is_empty() || self.name.len() > MAX_NAME_LENGTH {
86 return Err(IggyError::InvalidTopicName);
87 }
88
89 if !(0..=MAX_PARTITIONS_COUNT).contains(&self.partitions_count) {
90 return Err(IggyError::TooManyPartitions);
91 }
92
93 if let Some(replication_factor) = self.replication_factor
94 && replication_factor == 0
95 {
96 return Err(IggyError::InvalidReplicationFactor);
97 }
98
99 Ok(())
100 }
101}
102
103impl BytesSerializable for CreateTopic {
104 fn to_bytes(&self) -> Bytes {
105 let stream_id_bytes = self.stream_id.to_bytes();
106 let mut bytes = BytesMut::with_capacity(19 + stream_id_bytes.len() + self.name.len());
107 bytes.put_slice(&stream_id_bytes);
108 bytes.put_u32_le(self.partitions_count);
109 bytes.put_u8(self.compression_algorithm.as_code());
110 bytes.put_u64_le(self.message_expiry.into());
111 bytes.put_u64_le(self.max_topic_size.into());
112 match self.replication_factor {
113 Some(replication_factor) => bytes.put_u8(replication_factor),
114 None => bytes.put_u8(0),
115 }
116 #[allow(clippy::cast_possible_truncation)]
117 bytes.put_u8(self.name.len() as u8);
118 bytes.put_slice(self.name.as_bytes());
119 bytes.freeze()
120 }
121
122 fn from_bytes(bytes: Bytes) -> std::result::Result<CreateTopic, IggyError> {
123 if bytes.len() < 14 {
124 return Err(IggyError::InvalidCommand);
125 }
126 let mut position = 0;
127 let stream_id = Identifier::from_bytes(bytes.clone())?;
128 position += stream_id.get_size_bytes().as_bytes_usize();
129 let partitions_count = u32::from_le_bytes(
130 bytes
131 .get(position..position + 4)
132 .ok_or(IggyError::InvalidCommand)?
133 .try_into()
134 .map_err(|_| IggyError::InvalidNumberEncoding)?,
135 );
136 let compression_algorithm = CompressionAlgorithm::from_code(
137 *bytes.get(position + 4).ok_or(IggyError::InvalidCommand)?,
138 )?;
139 let message_expiry = u64::from_le_bytes(
140 bytes
141 .get(position + 5..position + 13)
142 .ok_or(IggyError::InvalidCommand)?
143 .try_into()
144 .map_err(|_| IggyError::InvalidNumberEncoding)?,
145 );
146 let message_expiry: IggyExpiry = message_expiry.into();
147 let max_topic_size = u64::from_le_bytes(
148 bytes
149 .get(position + 13..position + 21)
150 .ok_or(IggyError::InvalidCommand)?
151 .try_into()
152 .map_err(|_| IggyError::InvalidNumberEncoding)?,
153 );
154 let max_topic_size: MaxTopicSize = max_topic_size.into();
155 let replication_factor = match *bytes.get(position + 21).ok_or(IggyError::InvalidCommand)? {
156 0 => None,
157 factor => Some(factor),
158 };
159 let name_length = *bytes.get(position + 22).ok_or(IggyError::InvalidCommand)? as usize;
160 let name = from_utf8(
161 bytes
162 .get(position + 23..position + 23 + name_length)
163 .ok_or(IggyError::InvalidCommand)?,
164 )
165 .map_err(|_| IggyError::InvalidUtf8)?
166 .to_string();
167 let command = CreateTopic {
168 stream_id,
169 partitions_count,
170 compression_algorithm,
171 message_expiry,
172 max_topic_size,
173 replication_factor,
174 name,
175 };
176 Ok(command)
177 }
178}
179
180impl Display for CreateTopic {
181 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182 write!(
183 f,
184 "{}|{}|{}|{}|{}|{}",
185 self.stream_id,
186 self.partitions_count,
187 self.message_expiry,
188 self.max_topic_size,
189 self.replication_factor.unwrap_or(0),
190 self.name
191 )
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198 use bytes::BufMut;
199
200 #[test]
201 fn should_be_serialized_as_bytes() {
202 let command = CreateTopic {
203 stream_id: Identifier::numeric(1).unwrap(),
204 partitions_count: 3,
205 message_expiry: IggyExpiry::NeverExpire,
206 compression_algorithm: CompressionAlgorithm::None,
207 max_topic_size: MaxTopicSize::ServerDefault,
208 replication_factor: Some(1),
209 name: "test".to_string(),
210 };
211 let bytes = command.to_bytes();
212 let mut position = 0;
213 let stream_id = Identifier::from_bytes(bytes.clone()).unwrap();
214 position += stream_id.get_size_bytes().as_bytes_usize();
215 let partitions_count =
216 u32::from_le_bytes(bytes[position..position + 4].try_into().unwrap());
217 let compression_algorithm = CompressionAlgorithm::from_code(bytes[position + 4]).unwrap();
218 let message_expiry =
219 u64::from_le_bytes(bytes[position + 5..position + 13].try_into().unwrap());
220 let message_expiry: IggyExpiry = message_expiry.into();
221 let max_topic_size =
222 u64::from_le_bytes(bytes[position + 13..position + 21].try_into().unwrap());
223 let max_topic_size: MaxTopicSize = max_topic_size.into();
224 let replication_factor = bytes[position + 21];
225 let name_length = bytes[position + 22];
226 let name = from_utf8(&bytes[position + 23..(position + 23 + name_length as usize)])
227 .unwrap()
228 .to_string();
229
230 assert!(!bytes.is_empty());
231 assert_eq!(stream_id, command.stream_id);
232 assert_eq!(partitions_count, command.partitions_count);
233 assert_eq!(compression_algorithm, command.compression_algorithm);
234 assert_eq!(message_expiry, command.message_expiry);
235 assert_eq!(max_topic_size, command.max_topic_size);
236 assert_eq!(replication_factor, command.replication_factor.unwrap());
237 assert_eq!(name.len() as u8, command.name.len() as u8);
238 assert_eq!(name, command.name);
239 }
240
241 #[test]
242 fn from_bytes_should_fail_on_empty_input() {
243 assert!(CreateTopic::from_bytes(Bytes::new()).is_err());
244 }
245
246 #[test]
247 fn from_bytes_should_fail_on_truncated_input() {
248 let command = CreateTopic {
249 stream_id: Identifier::numeric(1).unwrap(),
250 partitions_count: 3,
251 compression_algorithm: CompressionAlgorithm::None,
252 message_expiry: IggyExpiry::NeverExpire,
253 max_topic_size: MaxTopicSize::ServerDefault,
254 replication_factor: Some(1),
255 name: "test".to_string(),
256 };
257 let bytes = command.to_bytes();
258 for i in 0..bytes.len() - 1 {
259 let truncated = bytes.slice(..i);
260 assert!(
261 CreateTopic::from_bytes(truncated).is_err(),
262 "expected error for truncation at byte {i}"
263 );
264 }
265 }
266
267 #[test]
268 fn should_be_deserialized_from_bytes() {
269 let stream_id = Identifier::numeric(1).unwrap();
270 let partitions_count = 3u32;
271 let compression_algorithm = CompressionAlgorithm::None;
272 let name = "test".to_string();
273 let message_expiry = IggyExpiry::NeverExpire;
274 let max_topic_size = MaxTopicSize::ServerDefault;
275 let replication_factor = 1;
276 let stream_id_bytes = stream_id.to_bytes();
277 let mut bytes = BytesMut::with_capacity(10 + stream_id_bytes.len() + name.len());
278 bytes.put_slice(&stream_id_bytes);
279 bytes.put_u32_le(partitions_count);
280 bytes.put_u8(compression_algorithm.as_code());
281 bytes.put_u64_le(message_expiry.into());
282 bytes.put_u64_le(max_topic_size.as_bytes_u64());
283 bytes.put_u8(replication_factor);
284 #[allow(clippy::cast_possible_truncation)]
285 bytes.put_u8(name.len() as u8);
286 bytes.put_slice(name.as_bytes());
287
288 let command = CreateTopic::from_bytes(bytes.freeze());
289 assert!(command.is_ok());
290
291 let command = command.unwrap();
292 assert_eq!(command.stream_id, stream_id);
293 assert_eq!(command.name, name);
294 assert_eq!(command.partitions_count, partitions_count);
295 assert_eq!(command.compression_algorithm, compression_algorithm);
296 assert_eq!(command.message_expiry, message_expiry);
297 assert_eq!(command.max_topic_size, max_topic_size);
298 assert_eq!(command.replication_factor.unwrap(), replication_factor);
299 assert_eq!(command.partitions_count, partitions_count);
300 }
301}