Skip to main content

iggy_common/commands/topics/
create_topic.rs

1/* Licensed to the Apache Software Foundation (ASF) under one
2 * or more contributor license agreements.  See the NOTICE file
3 * distributed with this work for additional information
4 * regarding copyright ownership.  The ASF licenses this file
5 * to you under the Apache License, Version 2.0 (the
6 * "License"); you may not use this file except in compliance
7 * with the License.  You may obtain a copy of the License at
8 *
9 *   http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied.  See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19use super::{MAX_NAME_LENGTH, MAX_PARTITIONS_COUNT};
20use crate::BytesSerializable;
21use crate::CompressionAlgorithm;
22use crate::Identifier;
23use crate::Sizeable;
24use crate::Validatable;
25use crate::error::IggyError;
26use crate::utils::expiry::IggyExpiry;
27use crate::utils::topic_size::MaxTopicSize;
28use crate::{CREATE_TOPIC_CODE, Command};
29use bytes::{BufMut, Bytes, BytesMut};
30use serde::{Deserialize, Serialize};
31use std::fmt::Display;
32use std::str::from_utf8;
33
34/// `CreateTopic` command is used to create a new topic in a stream.
35/// It has additional payload:
36/// - `stream_id` - unique stream ID (numeric or name).
37/// - `partitions_count` - number of partitions in the topic, max value is 1000.
38/// - `message_expiry` - message expiry, if `NeverExpire` then messages will never expire.
39/// - `max_topic_size` - maximum size of the topic, if `Unlimited` then topic size is unlimited.
40///   Can't be lower than segment size in the config.
41/// - `replication_factor` - replication factor for the topic.
42/// - `name` - unique topic name, max length is 255 characters.
43#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
44pub struct CreateTopic {
45    /// Unique stream ID (numeric or name).
46    #[serde(skip)]
47    pub stream_id: Identifier,
48    /// Number of partitions in the topic, max value is 1000.
49    pub partitions_count: u32,
50    /// Compression algorithm for the topic.
51    pub compression_algorithm: CompressionAlgorithm,
52    /// Message expiry, if `NeverExpire` then messages will never expire.
53    pub message_expiry: IggyExpiry,
54    /// Max topic size, if `Unlimited` then topic size is unlimited.
55    /// Can't be lower than segment size in the config.
56    pub max_topic_size: MaxTopicSize,
57    /// Replication factor for the topic.
58    pub replication_factor: Option<u8>,
59    /// Unique topic name, max length is 255 characters.
60    pub name: String,
61}
62
63impl Command for CreateTopic {
64    fn code(&self) -> u32 {
65        CREATE_TOPIC_CODE
66    }
67}
68
69impl Default for CreateTopic {
70    fn default() -> Self {
71        CreateTopic {
72            stream_id: Identifier::default(),
73            partitions_count: 1,
74            compression_algorithm: CompressionAlgorithm::None,
75            message_expiry: IggyExpiry::NeverExpire,
76            max_topic_size: MaxTopicSize::ServerDefault,
77            replication_factor: None,
78            name: "topic".to_string(),
79        }
80    }
81}
82
83impl Validatable<IggyError> for CreateTopic {
84    fn validate(&self) -> Result<(), IggyError> {
85        if self.name.is_empty() || self.name.len() > MAX_NAME_LENGTH {
86            return Err(IggyError::InvalidTopicName);
87        }
88
89        if !(0..=MAX_PARTITIONS_COUNT).contains(&self.partitions_count) {
90            return Err(IggyError::TooManyPartitions);
91        }
92
93        if let Some(replication_factor) = self.replication_factor
94            && replication_factor == 0
95        {
96            return Err(IggyError::InvalidReplicationFactor);
97        }
98
99        Ok(())
100    }
101}
102
103impl BytesSerializable for CreateTopic {
104    fn to_bytes(&self) -> Bytes {
105        let stream_id_bytes = self.stream_id.to_bytes();
106        let mut bytes = BytesMut::with_capacity(19 + stream_id_bytes.len() + self.name.len());
107        bytes.put_slice(&stream_id_bytes);
108        bytes.put_u32_le(self.partitions_count);
109        bytes.put_u8(self.compression_algorithm.as_code());
110        bytes.put_u64_le(self.message_expiry.into());
111        bytes.put_u64_le(self.max_topic_size.into());
112        match self.replication_factor {
113            Some(replication_factor) => bytes.put_u8(replication_factor),
114            None => bytes.put_u8(0),
115        }
116        #[allow(clippy::cast_possible_truncation)]
117        bytes.put_u8(self.name.len() as u8);
118        bytes.put_slice(self.name.as_bytes());
119        bytes.freeze()
120    }
121
122    fn from_bytes(bytes: Bytes) -> std::result::Result<CreateTopic, IggyError> {
123        if bytes.len() < 14 {
124            return Err(IggyError::InvalidCommand);
125        }
126        let mut position = 0;
127        let stream_id = Identifier::from_bytes(bytes.clone())?;
128        position += stream_id.get_size_bytes().as_bytes_usize();
129        let partitions_count = u32::from_le_bytes(
130            bytes
131                .get(position..position + 4)
132                .ok_or(IggyError::InvalidCommand)?
133                .try_into()
134                .map_err(|_| IggyError::InvalidNumberEncoding)?,
135        );
136        let compression_algorithm = CompressionAlgorithm::from_code(
137            *bytes.get(position + 4).ok_or(IggyError::InvalidCommand)?,
138        )?;
139        let message_expiry = u64::from_le_bytes(
140            bytes
141                .get(position + 5..position + 13)
142                .ok_or(IggyError::InvalidCommand)?
143                .try_into()
144                .map_err(|_| IggyError::InvalidNumberEncoding)?,
145        );
146        let message_expiry: IggyExpiry = message_expiry.into();
147        let max_topic_size = u64::from_le_bytes(
148            bytes
149                .get(position + 13..position + 21)
150                .ok_or(IggyError::InvalidCommand)?
151                .try_into()
152                .map_err(|_| IggyError::InvalidNumberEncoding)?,
153        );
154        let max_topic_size: MaxTopicSize = max_topic_size.into();
155        let replication_factor = match *bytes.get(position + 21).ok_or(IggyError::InvalidCommand)? {
156            0 => None,
157            factor => Some(factor),
158        };
159        let name_length = *bytes.get(position + 22).ok_or(IggyError::InvalidCommand)? as usize;
160        let name = from_utf8(
161            bytes
162                .get(position + 23..position + 23 + name_length)
163                .ok_or(IggyError::InvalidCommand)?,
164        )
165        .map_err(|_| IggyError::InvalidUtf8)?
166        .to_string();
167        let command = CreateTopic {
168            stream_id,
169            partitions_count,
170            compression_algorithm,
171            message_expiry,
172            max_topic_size,
173            replication_factor,
174            name,
175        };
176        Ok(command)
177    }
178}
179
180impl Display for CreateTopic {
181    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182        write!(
183            f,
184            "{}|{}|{}|{}|{}|{}",
185            self.stream_id,
186            self.partitions_count,
187            self.message_expiry,
188            self.max_topic_size,
189            self.replication_factor.unwrap_or(0),
190            self.name
191        )
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198    use bytes::BufMut;
199
200    #[test]
201    fn should_be_serialized_as_bytes() {
202        let command = CreateTopic {
203            stream_id: Identifier::numeric(1).unwrap(),
204            partitions_count: 3,
205            message_expiry: IggyExpiry::NeverExpire,
206            compression_algorithm: CompressionAlgorithm::None,
207            max_topic_size: MaxTopicSize::ServerDefault,
208            replication_factor: Some(1),
209            name: "test".to_string(),
210        };
211        let bytes = command.to_bytes();
212        let mut position = 0;
213        let stream_id = Identifier::from_bytes(bytes.clone()).unwrap();
214        position += stream_id.get_size_bytes().as_bytes_usize();
215        let partitions_count =
216            u32::from_le_bytes(bytes[position..position + 4].try_into().unwrap());
217        let compression_algorithm = CompressionAlgorithm::from_code(bytes[position + 4]).unwrap();
218        let message_expiry =
219            u64::from_le_bytes(bytes[position + 5..position + 13].try_into().unwrap());
220        let message_expiry: IggyExpiry = message_expiry.into();
221        let max_topic_size =
222            u64::from_le_bytes(bytes[position + 13..position + 21].try_into().unwrap());
223        let max_topic_size: MaxTopicSize = max_topic_size.into();
224        let replication_factor = bytes[position + 21];
225        let name_length = bytes[position + 22];
226        let name = from_utf8(&bytes[position + 23..(position + 23 + name_length as usize)])
227            .unwrap()
228            .to_string();
229
230        assert!(!bytes.is_empty());
231        assert_eq!(stream_id, command.stream_id);
232        assert_eq!(partitions_count, command.partitions_count);
233        assert_eq!(compression_algorithm, command.compression_algorithm);
234        assert_eq!(message_expiry, command.message_expiry);
235        assert_eq!(max_topic_size, command.max_topic_size);
236        assert_eq!(replication_factor, command.replication_factor.unwrap());
237        assert_eq!(name.len() as u8, command.name.len() as u8);
238        assert_eq!(name, command.name);
239    }
240
241    #[test]
242    fn from_bytes_should_fail_on_empty_input() {
243        assert!(CreateTopic::from_bytes(Bytes::new()).is_err());
244    }
245
246    #[test]
247    fn from_bytes_should_fail_on_truncated_input() {
248        let command = CreateTopic {
249            stream_id: Identifier::numeric(1).unwrap(),
250            partitions_count: 3,
251            compression_algorithm: CompressionAlgorithm::None,
252            message_expiry: IggyExpiry::NeverExpire,
253            max_topic_size: MaxTopicSize::ServerDefault,
254            replication_factor: Some(1),
255            name: "test".to_string(),
256        };
257        let bytes = command.to_bytes();
258        for i in 0..bytes.len() - 1 {
259            let truncated = bytes.slice(..i);
260            assert!(
261                CreateTopic::from_bytes(truncated).is_err(),
262                "expected error for truncation at byte {i}"
263            );
264        }
265    }
266
267    #[test]
268    fn should_be_deserialized_from_bytes() {
269        let stream_id = Identifier::numeric(1).unwrap();
270        let partitions_count = 3u32;
271        let compression_algorithm = CompressionAlgorithm::None;
272        let name = "test".to_string();
273        let message_expiry = IggyExpiry::NeverExpire;
274        let max_topic_size = MaxTopicSize::ServerDefault;
275        let replication_factor = 1;
276        let stream_id_bytes = stream_id.to_bytes();
277        let mut bytes = BytesMut::with_capacity(10 + stream_id_bytes.len() + name.len());
278        bytes.put_slice(&stream_id_bytes);
279        bytes.put_u32_le(partitions_count);
280        bytes.put_u8(compression_algorithm.as_code());
281        bytes.put_u64_le(message_expiry.into());
282        bytes.put_u64_le(max_topic_size.as_bytes_u64());
283        bytes.put_u8(replication_factor);
284        #[allow(clippy::cast_possible_truncation)]
285        bytes.put_u8(name.len() as u8);
286        bytes.put_slice(name.as_bytes());
287
288        let command = CreateTopic::from_bytes(bytes.freeze());
289        assert!(command.is_ok());
290
291        let command = command.unwrap();
292        assert_eq!(command.stream_id, stream_id);
293        assert_eq!(command.name, name);
294        assert_eq!(command.partitions_count, partitions_count);
295        assert_eq!(command.compression_algorithm, compression_algorithm);
296        assert_eq!(command.message_expiry, message_expiry);
297        assert_eq!(command.max_topic_size, max_topic_size);
298        assert_eq!(command.replication_factor.unwrap(), replication_factor);
299        assert_eq!(command.partitions_count, partitions_count);
300    }
301}