iggy_common/commands/topics/
update_topic.rs1use super::MAX_NAME_LENGTH;
20use crate::BytesSerializable;
21use crate::CompressionAlgorithm;
22use crate::Identifier;
23use crate::Sizeable;
24use crate::Validatable;
25use crate::error::IggyError;
26use crate::utils::expiry::IggyExpiry;
27use crate::utils::topic_size::MaxTopicSize;
28use crate::{Command, UPDATE_TOPIC_CODE};
29use bytes::{BufMut, Bytes, BytesMut};
30use serde::{Deserialize, Serialize};
31use std::fmt::Display;
32use std::str::from_utf8;
33
34#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
44pub struct UpdateTopic {
45 #[serde(skip)]
47 pub stream_id: Identifier,
48 #[serde(skip)]
50 pub topic_id: Identifier,
51 pub compression_algorithm: CompressionAlgorithm,
53 pub message_expiry: IggyExpiry,
55 pub max_topic_size: MaxTopicSize,
58 pub replication_factor: Option<u8>,
60 pub name: String,
62}
63
64impl Command for UpdateTopic {
65 fn code(&self) -> u32 {
66 UPDATE_TOPIC_CODE
67 }
68}
69
70impl Default for UpdateTopic {
71 fn default() -> Self {
72 UpdateTopic {
73 stream_id: Identifier::default(),
74 topic_id: Identifier::default(),
75 compression_algorithm: Default::default(),
76 message_expiry: IggyExpiry::NeverExpire,
77 max_topic_size: MaxTopicSize::ServerDefault,
78 replication_factor: None,
79 name: "topic".to_string(),
80 }
81 }
82}
83
84impl Validatable<IggyError> for UpdateTopic {
85 fn validate(&self) -> Result<(), IggyError> {
86 if self.name.is_empty() || self.name.len() > MAX_NAME_LENGTH {
87 return Err(IggyError::InvalidTopicName);
88 }
89
90 if let Some(replication_factor) = self.replication_factor
91 && replication_factor == 0
92 {
93 return Err(IggyError::InvalidReplicationFactor);
94 }
95
96 Ok(())
97 }
98}
99
100impl BytesSerializable for UpdateTopic {
101 fn to_bytes(&self) -> Bytes {
102 let stream_id_bytes = self.stream_id.to_bytes();
103 let topic_id_bytes = self.topic_id.to_bytes();
104 let mut bytes = BytesMut::with_capacity(
105 19 + stream_id_bytes.len() + topic_id_bytes.len() + self.name.len(),
106 );
107 bytes.put_slice(&stream_id_bytes.clone());
108 bytes.put_slice(&topic_id_bytes.clone());
109 bytes.put_u8(self.compression_algorithm.as_code());
110 bytes.put_u64_le(self.message_expiry.into());
111 bytes.put_u64_le(self.max_topic_size.into());
112 match self.replication_factor {
113 Some(replication_factor) => bytes.put_u8(replication_factor),
114 None => bytes.put_u8(0),
115 }
116 #[allow(clippy::cast_possible_truncation)]
117 bytes.put_u8(self.name.len() as u8);
118 bytes.put_slice(self.name.as_bytes());
119 bytes.freeze()
120 }
121
122 fn from_bytes(bytes: Bytes) -> Result<UpdateTopic, IggyError> {
123 if bytes.len() < 21 {
124 return Err(IggyError::InvalidCommand);
125 }
126 let mut position = 0;
127 let stream_id = Identifier::from_bytes(bytes.clone())?;
128 position += stream_id.get_size_bytes().as_bytes_usize();
129 let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
130 position += topic_id.get_size_bytes().as_bytes_usize();
131 let compression_algorithm = CompressionAlgorithm::from_code(
132 *bytes.get(position).ok_or(IggyError::InvalidCommand)?,
133 )?;
134 position += 1;
135 let message_expiry = u64::from_le_bytes(
136 bytes
137 .get(position..position + 8)
138 .ok_or(IggyError::InvalidCommand)?
139 .try_into()
140 .map_err(|_| IggyError::InvalidNumberEncoding)?,
141 );
142 let message_expiry: IggyExpiry = message_expiry.into();
143 let max_topic_size = u64::from_le_bytes(
144 bytes
145 .get(position + 8..position + 16)
146 .ok_or(IggyError::InvalidCommand)?
147 .try_into()
148 .map_err(|_| IggyError::InvalidNumberEncoding)?,
149 );
150 let max_topic_size: MaxTopicSize = max_topic_size.into();
151 let replication_factor = match *bytes.get(position + 16).ok_or(IggyError::InvalidCommand)? {
152 0 => None,
153 factor => Some(factor),
154 };
155 let name_length = *bytes.get(position + 17).ok_or(IggyError::InvalidCommand)? as usize;
156 let name = from_utf8(
157 bytes
158 .get(position + 18..position + 18 + name_length)
159 .ok_or(IggyError::InvalidCommand)?,
160 )
161 .map_err(|_| IggyError::InvalidUtf8)?
162 .to_string();
163 let command = UpdateTopic {
164 stream_id,
165 topic_id,
166 compression_algorithm,
167 message_expiry,
168 max_topic_size,
169 replication_factor,
170 name,
171 };
172 Ok(command)
173 }
174}
175
176impl Display for UpdateTopic {
177 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178 write!(
179 f,
180 "{}|{}|{}|{}|{}|{}",
181 self.stream_id,
182 self.topic_id,
183 self.message_expiry,
184 self.max_topic_size,
185 self.replication_factor.unwrap_or(0),
186 self.name,
187 )
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194 use crate::utils::byte_size::IggyByteSize;
195 use bytes::BufMut;
196
197 #[test]
198 fn should_be_serialized_as_bytes() {
199 let command = UpdateTopic {
200 stream_id: Identifier::numeric(1).unwrap(),
201 topic_id: Identifier::numeric(2).unwrap(),
202 compression_algorithm: CompressionAlgorithm::None,
203 message_expiry: IggyExpiry::NeverExpire,
204 max_topic_size: MaxTopicSize::ServerDefault,
205 replication_factor: Some(1),
206 name: "test".to_string(),
207 };
208
209 let bytes = command.to_bytes();
210 let mut position = 0;
211 let stream_id = Identifier::from_bytes(bytes.clone()).unwrap();
212 position += stream_id.get_size_bytes().as_bytes_usize();
213 let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap();
214 position += topic_id.get_size_bytes().as_bytes_usize();
215 let compression_algorithm = CompressionAlgorithm::from_code(bytes[position]).unwrap();
216 position += 1;
217 let message_expiry = u64::from_le_bytes(bytes[position..position + 8].try_into().unwrap());
218 let message_expiry: IggyExpiry = message_expiry.into();
219 let max_topic_size =
220 u64::from_le_bytes(bytes[position + 8..position + 16].try_into().unwrap());
221 let max_topic_size: MaxTopicSize = max_topic_size.into();
222 let replication_factor = bytes[position + 16];
223 let name_length = bytes[position + 17];
224 let name = from_utf8(&bytes[position + 18..position + 18 + name_length as usize])
225 .unwrap()
226 .to_string();
227
228 assert!(!bytes.is_empty());
229 assert_eq!(stream_id, command.stream_id);
230 assert_eq!(topic_id, command.topic_id);
231 assert_eq!(compression_algorithm, command.compression_algorithm);
232 assert_eq!(message_expiry, command.message_expiry);
233 assert_eq!(max_topic_size, command.max_topic_size);
234 assert_eq!(replication_factor, command.replication_factor.unwrap());
235 assert_eq!(name.len() as u8, command.name.len() as u8);
236 assert_eq!(name, command.name);
237 }
238
239 #[test]
240 fn from_bytes_should_fail_on_empty_input() {
241 assert!(UpdateTopic::from_bytes(Bytes::new()).is_err());
242 }
243
244 #[test]
245 fn from_bytes_should_fail_on_truncated_input() {
246 let command = UpdateTopic {
247 stream_id: Identifier::numeric(1).unwrap(),
248 topic_id: Identifier::numeric(2).unwrap(),
249 compression_algorithm: CompressionAlgorithm::None,
250 message_expiry: IggyExpiry::NeverExpire,
251 max_topic_size: MaxTopicSize::ServerDefault,
252 replication_factor: Some(1),
253 name: "test".to_string(),
254 };
255 let bytes = command.to_bytes();
256 for i in 0..bytes.len() - 1 {
257 let truncated = bytes.slice(..i);
258 assert!(
259 UpdateTopic::from_bytes(truncated).is_err(),
260 "expected error for truncation at byte {i}"
261 );
262 }
263 }
264
265 #[test]
266 fn should_be_deserialized_from_bytes() {
267 let stream_id = Identifier::numeric(1).unwrap();
268 let topic_id = Identifier::numeric(2).unwrap();
269 let compression_algorithm = CompressionAlgorithm::None;
270 let name = "test".to_string();
271 let message_expiry = IggyExpiry::NeverExpire;
272 let max_topic_size = MaxTopicSize::Custom(IggyByteSize::from(100));
273 let replication_factor = 1;
274
275 let stream_id_bytes = stream_id.to_bytes();
276 let topic_id_bytes = topic_id.to_bytes();
277 let mut bytes =
278 BytesMut::with_capacity(19 + stream_id_bytes.len() + topic_id_bytes.len() + name.len());
279 bytes.put_slice(&stream_id_bytes);
280 bytes.put_slice(&topic_id_bytes);
281 bytes.put_u8(compression_algorithm.as_code());
282 bytes.put_u64_le(message_expiry.into());
283 bytes.put_u64_le(max_topic_size.as_bytes_u64());
284 bytes.put_u8(replication_factor);
285
286 #[allow(clippy::cast_possible_truncation)]
287 bytes.put_u8(name.len() as u8);
288 bytes.put_slice(name.as_bytes());
289
290 let command = UpdateTopic::from_bytes(bytes.freeze());
291 assert!(command.is_ok());
292
293 let command = command.unwrap();
294 assert_eq!(command.stream_id, stream_id);
295 assert_eq!(command.topic_id, topic_id);
296 assert_eq!(command.compression_algorithm, compression_algorithm);
297 assert_eq!(command.message_expiry, message_expiry);
298 assert_eq!(command.max_topic_size, max_topic_size);
299 assert_eq!(command.replication_factor, Some(replication_factor));
300 assert_eq!(command.name, name);
301 }
302}