Skip to main content

iggy_common/commands/topics/
update_topic.rs

1/* Licensed to the Apache Software Foundation (ASF) under one
2 * or more contributor license agreements.  See the NOTICE file
3 * distributed with this work for additional information
4 * regarding copyright ownership.  The ASF licenses this file
5 * to you under the Apache License, Version 2.0 (the
6 * "License"); you may not use this file except in compliance
7 * with the License.  You may obtain a copy of the License at
8 *
9 *   http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied.  See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19use super::MAX_NAME_LENGTH;
20use crate::BytesSerializable;
21use crate::CompressionAlgorithm;
22use crate::Identifier;
23use crate::Sizeable;
24use crate::Validatable;
25use crate::error::IggyError;
26use crate::utils::expiry::IggyExpiry;
27use crate::utils::topic_size::MaxTopicSize;
28use crate::{Command, UPDATE_TOPIC_CODE};
29use bytes::{BufMut, Bytes, BytesMut};
30use serde::{Deserialize, Serialize};
31use std::fmt::Display;
32use std::str::from_utf8;
33
34/// `UpdateTopic` command is used to update a topic in a stream.
35/// It has additional payload:
36/// - `stream_id` - unique stream ID (numeric or name).
37/// - `topic_id` - unique topic ID (numeric or name).
38/// - `message_expiry` - message expiry, if `NeverExpire` then messages will never expire.
39/// - `max_topic_size` - maximum size of the topic in bytes, if `Unlimited` then topic size is unlimited.
40///   Can't be lower than segment size in the config.
41/// - `replication_factor` - replication factor for the topic.
42/// - `name` - unique topic name, max length is 255 characters.
43#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
44pub struct UpdateTopic {
45    /// Unique stream ID (numeric or name).
46    #[serde(skip)]
47    pub stream_id: Identifier,
48    /// Unique topic ID (numeric or name).
49    #[serde(skip)]
50    pub topic_id: Identifier,
51    /// Compression algorithm for the topic.
52    pub compression_algorithm: CompressionAlgorithm,
53    /// Message expiry, if `NeverExpire` then messages will never expire.
54    pub message_expiry: IggyExpiry,
55    /// Max topic size, if `Unlimited` then topic size is unlimited.
56    /// Can't be lower than segment size in the config.
57    pub max_topic_size: MaxTopicSize,
58    /// Replication factor for the topic.
59    pub replication_factor: Option<u8>,
60    /// Unique topic name, max length is 255 characters.
61    pub name: String,
62}
63
64impl Command for UpdateTopic {
65    fn code(&self) -> u32 {
66        UPDATE_TOPIC_CODE
67    }
68}
69
70impl Default for UpdateTopic {
71    fn default() -> Self {
72        UpdateTopic {
73            stream_id: Identifier::default(),
74            topic_id: Identifier::default(),
75            compression_algorithm: Default::default(),
76            message_expiry: IggyExpiry::NeverExpire,
77            max_topic_size: MaxTopicSize::ServerDefault,
78            replication_factor: None,
79            name: "topic".to_string(),
80        }
81    }
82}
83
84impl Validatable<IggyError> for UpdateTopic {
85    fn validate(&self) -> Result<(), IggyError> {
86        if self.name.is_empty() || self.name.len() > MAX_NAME_LENGTH {
87            return Err(IggyError::InvalidTopicName);
88        }
89
90        if let Some(replication_factor) = self.replication_factor
91            && replication_factor == 0
92        {
93            return Err(IggyError::InvalidReplicationFactor);
94        }
95
96        Ok(())
97    }
98}
99
100impl BytesSerializable for UpdateTopic {
101    fn to_bytes(&self) -> Bytes {
102        let stream_id_bytes = self.stream_id.to_bytes();
103        let topic_id_bytes = self.topic_id.to_bytes();
104        let mut bytes = BytesMut::with_capacity(
105            19 + stream_id_bytes.len() + topic_id_bytes.len() + self.name.len(),
106        );
107        bytes.put_slice(&stream_id_bytes.clone());
108        bytes.put_slice(&topic_id_bytes.clone());
109        bytes.put_u8(self.compression_algorithm.as_code());
110        bytes.put_u64_le(self.message_expiry.into());
111        bytes.put_u64_le(self.max_topic_size.into());
112        match self.replication_factor {
113            Some(replication_factor) => bytes.put_u8(replication_factor),
114            None => bytes.put_u8(0),
115        }
116        #[allow(clippy::cast_possible_truncation)]
117        bytes.put_u8(self.name.len() as u8);
118        bytes.put_slice(self.name.as_bytes());
119        bytes.freeze()
120    }
121
122    fn from_bytes(bytes: Bytes) -> Result<UpdateTopic, IggyError> {
123        if bytes.len() < 21 {
124            return Err(IggyError::InvalidCommand);
125        }
126        let mut position = 0;
127        let stream_id = Identifier::from_bytes(bytes.clone())?;
128        position += stream_id.get_size_bytes().as_bytes_usize();
129        let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
130        position += topic_id.get_size_bytes().as_bytes_usize();
131        let compression_algorithm = CompressionAlgorithm::from_code(
132            *bytes.get(position).ok_or(IggyError::InvalidCommand)?,
133        )?;
134        position += 1;
135        let message_expiry = u64::from_le_bytes(
136            bytes
137                .get(position..position + 8)
138                .ok_or(IggyError::InvalidCommand)?
139                .try_into()
140                .map_err(|_| IggyError::InvalidNumberEncoding)?,
141        );
142        let message_expiry: IggyExpiry = message_expiry.into();
143        let max_topic_size = u64::from_le_bytes(
144            bytes
145                .get(position + 8..position + 16)
146                .ok_or(IggyError::InvalidCommand)?
147                .try_into()
148                .map_err(|_| IggyError::InvalidNumberEncoding)?,
149        );
150        let max_topic_size: MaxTopicSize = max_topic_size.into();
151        let replication_factor = match *bytes.get(position + 16).ok_or(IggyError::InvalidCommand)? {
152            0 => None,
153            factor => Some(factor),
154        };
155        let name_length = *bytes.get(position + 17).ok_or(IggyError::InvalidCommand)? as usize;
156        let name = from_utf8(
157            bytes
158                .get(position + 18..position + 18 + name_length)
159                .ok_or(IggyError::InvalidCommand)?,
160        )
161        .map_err(|_| IggyError::InvalidUtf8)?
162        .to_string();
163        let command = UpdateTopic {
164            stream_id,
165            topic_id,
166            compression_algorithm,
167            message_expiry,
168            max_topic_size,
169            replication_factor,
170            name,
171        };
172        Ok(command)
173    }
174}
175
176impl Display for UpdateTopic {
177    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178        write!(
179            f,
180            "{}|{}|{}|{}|{}|{}",
181            self.stream_id,
182            self.topic_id,
183            self.message_expiry,
184            self.max_topic_size,
185            self.replication_factor.unwrap_or(0),
186            self.name,
187        )
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194    use crate::utils::byte_size::IggyByteSize;
195    use bytes::BufMut;
196
197    #[test]
198    fn should_be_serialized_as_bytes() {
199        let command = UpdateTopic {
200            stream_id: Identifier::numeric(1).unwrap(),
201            topic_id: Identifier::numeric(2).unwrap(),
202            compression_algorithm: CompressionAlgorithm::None,
203            message_expiry: IggyExpiry::NeverExpire,
204            max_topic_size: MaxTopicSize::ServerDefault,
205            replication_factor: Some(1),
206            name: "test".to_string(),
207        };
208
209        let bytes = command.to_bytes();
210        let mut position = 0;
211        let stream_id = Identifier::from_bytes(bytes.clone()).unwrap();
212        position += stream_id.get_size_bytes().as_bytes_usize();
213        let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap();
214        position += topic_id.get_size_bytes().as_bytes_usize();
215        let compression_algorithm = CompressionAlgorithm::from_code(bytes[position]).unwrap();
216        position += 1;
217        let message_expiry = u64::from_le_bytes(bytes[position..position + 8].try_into().unwrap());
218        let message_expiry: IggyExpiry = message_expiry.into();
219        let max_topic_size =
220            u64::from_le_bytes(bytes[position + 8..position + 16].try_into().unwrap());
221        let max_topic_size: MaxTopicSize = max_topic_size.into();
222        let replication_factor = bytes[position + 16];
223        let name_length = bytes[position + 17];
224        let name = from_utf8(&bytes[position + 18..position + 18 + name_length as usize])
225            .unwrap()
226            .to_string();
227
228        assert!(!bytes.is_empty());
229        assert_eq!(stream_id, command.stream_id);
230        assert_eq!(topic_id, command.topic_id);
231        assert_eq!(compression_algorithm, command.compression_algorithm);
232        assert_eq!(message_expiry, command.message_expiry);
233        assert_eq!(max_topic_size, command.max_topic_size);
234        assert_eq!(replication_factor, command.replication_factor.unwrap());
235        assert_eq!(name.len() as u8, command.name.len() as u8);
236        assert_eq!(name, command.name);
237    }
238
239    #[test]
240    fn from_bytes_should_fail_on_empty_input() {
241        assert!(UpdateTopic::from_bytes(Bytes::new()).is_err());
242    }
243
244    #[test]
245    fn from_bytes_should_fail_on_truncated_input() {
246        let command = UpdateTopic {
247            stream_id: Identifier::numeric(1).unwrap(),
248            topic_id: Identifier::numeric(2).unwrap(),
249            compression_algorithm: CompressionAlgorithm::None,
250            message_expiry: IggyExpiry::NeverExpire,
251            max_topic_size: MaxTopicSize::ServerDefault,
252            replication_factor: Some(1),
253            name: "test".to_string(),
254        };
255        let bytes = command.to_bytes();
256        for i in 0..bytes.len() - 1 {
257            let truncated = bytes.slice(..i);
258            assert!(
259                UpdateTopic::from_bytes(truncated).is_err(),
260                "expected error for truncation at byte {i}"
261            );
262        }
263    }
264
265    #[test]
266    fn should_be_deserialized_from_bytes() {
267        let stream_id = Identifier::numeric(1).unwrap();
268        let topic_id = Identifier::numeric(2).unwrap();
269        let compression_algorithm = CompressionAlgorithm::None;
270        let name = "test".to_string();
271        let message_expiry = IggyExpiry::NeverExpire;
272        let max_topic_size = MaxTopicSize::Custom(IggyByteSize::from(100));
273        let replication_factor = 1;
274
275        let stream_id_bytes = stream_id.to_bytes();
276        let topic_id_bytes = topic_id.to_bytes();
277        let mut bytes =
278            BytesMut::with_capacity(19 + stream_id_bytes.len() + topic_id_bytes.len() + name.len());
279        bytes.put_slice(&stream_id_bytes);
280        bytes.put_slice(&topic_id_bytes);
281        bytes.put_u8(compression_algorithm.as_code());
282        bytes.put_u64_le(message_expiry.into());
283        bytes.put_u64_le(max_topic_size.as_bytes_u64());
284        bytes.put_u8(replication_factor);
285
286        #[allow(clippy::cast_possible_truncation)]
287        bytes.put_u8(name.len() as u8);
288        bytes.put_slice(name.as_bytes());
289
290        let command = UpdateTopic::from_bytes(bytes.freeze());
291        assert!(command.is_ok());
292
293        let command = command.unwrap();
294        assert_eq!(command.stream_id, stream_id);
295        assert_eq!(command.topic_id, topic_id);
296        assert_eq!(command.compression_algorithm, compression_algorithm);
297        assert_eq!(command.message_expiry, message_expiry);
298        assert_eq!(command.max_topic_size, max_topic_size);
299        assert_eq!(command.replication_factor, Some(replication_factor));
300        assert_eq!(command.name, name);
301    }
302}