Skip to main content

iggy_common/commands/streams/
update_stream.rs

1/* Licensed to the Apache Software Foundation (ASF) under one
2 * or more contributor license agreements.  See the NOTICE file
3 * distributed with this work for additional information
4 * regarding copyright ownership.  The ASF licenses this file
5 * to you under the Apache License, Version 2.0 (the
6 * "License"); you may not use this file except in compliance
7 * with the License.  You may obtain a copy of the License at
8 *
9 *   http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied.  See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19use super::MAX_NAME_LENGTH;
20use crate::BytesSerializable;
21use crate::Identifier;
22use crate::Sizeable;
23use crate::Validatable;
24use crate::error::IggyError;
25use crate::{Command, UPDATE_STREAM_CODE};
26use bytes::{BufMut, Bytes, BytesMut};
27use serde::{Deserialize, Serialize};
28use std::fmt::Display;
29use std::str::from_utf8;
30
31/// `UpdateStream` command is used to update an existing stream.
32/// It has additional payload:
33/// - `stream_id` - unique stream ID (numeric or name).
34/// - `name` - unique stream name (string), max length is 255 characters.
35#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
36pub struct UpdateStream {
37    /// Unique stream ID (numeric or name).
38    #[serde(skip)]
39    pub stream_id: Identifier,
40    /// Unique stream name (string), max length is 255 characters.
41    pub name: String,
42}
43
44impl Command for UpdateStream {
45    fn code(&self) -> u32 {
46        UPDATE_STREAM_CODE
47    }
48}
49
50impl Default for UpdateStream {
51    fn default() -> Self {
52        UpdateStream {
53            stream_id: Identifier::default(),
54            name: "stream".to_string(),
55        }
56    }
57}
58
59impl Validatable<IggyError> for UpdateStream {
60    fn validate(&self) -> Result<(), IggyError> {
61        if self.name.is_empty() || self.name.len() > MAX_NAME_LENGTH {
62            return Err(IggyError::InvalidStreamName);
63        }
64
65        Ok(())
66    }
67}
68
69impl BytesSerializable for UpdateStream {
70    fn to_bytes(&self) -> Bytes {
71        let stream_id_bytes = self.stream_id.to_bytes();
72        let mut bytes = BytesMut::with_capacity(1 + stream_id_bytes.len() + self.name.len());
73        bytes.put_slice(&stream_id_bytes);
74        #[allow(clippy::cast_possible_truncation)]
75        bytes.put_u8(self.name.len() as u8);
76        bytes.put_slice(self.name.as_bytes());
77        bytes.freeze()
78    }
79
80    fn from_bytes(bytes: Bytes) -> std::result::Result<UpdateStream, IggyError> {
81        if bytes.len() < 5 {
82            return Err(IggyError::InvalidCommand);
83        }
84
85        let mut position = 0;
86        let stream_id = Identifier::from_bytes(bytes.clone())?;
87        position += stream_id.get_size_bytes().as_bytes_usize();
88        let name_length = *bytes.get(position).ok_or(IggyError::InvalidCommand)? as usize;
89        let name = from_utf8(
90            bytes
91                .get(position + 1..position + 1 + name_length)
92                .ok_or(IggyError::InvalidCommand)?,
93        )
94        .map_err(|_| IggyError::InvalidUtf8)?
95        .to_string();
96
97        let command = UpdateStream { stream_id, name };
98        Ok(command)
99    }
100}
101
102impl Display for UpdateStream {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        write!(f, "{}|{}", self.stream_id, self.name)
105    }
106}
107
108#[cfg(test)]
109mod tests {
110    use super::*;
111
112    #[test]
113    fn should_be_serialized_as_bytes() {
114        let command = UpdateStream {
115            stream_id: Identifier::numeric(1).unwrap(),
116            name: "test".to_string(),
117        };
118
119        let bytes = command.to_bytes();
120        let mut position = 0;
121        let stream_id = Identifier::from_bytes(bytes.clone()).unwrap();
122        position += stream_id.get_size_bytes().as_bytes_usize();
123        let name_length = bytes[position];
124        let name = from_utf8(&bytes[position + 1..position + 1 + name_length as usize])
125            .unwrap()
126            .to_string();
127
128        assert!(!bytes.is_empty());
129        assert_eq!(stream_id, command.stream_id);
130        assert_eq!(name, command.name);
131    }
132
133    #[test]
134    fn should_be_deserialized_from_bytes() {
135        let stream_id = Identifier::numeric(1).unwrap();
136        let name = "test".to_string();
137
138        let stream_id_bytes = stream_id.to_bytes();
139        let mut bytes = BytesMut::with_capacity(1 + stream_id_bytes.len() + name.len());
140        bytes.put_slice(&stream_id_bytes);
141        #[allow(clippy::cast_possible_truncation)]
142        bytes.put_u8(name.len() as u8);
143        bytes.put_slice(name.as_bytes());
144        let command = UpdateStream::from_bytes(bytes.freeze());
145        assert!(command.is_ok());
146
147        let command = command.unwrap();
148        assert_eq!(command.stream_id, stream_id);
149        assert_eq!(command.name, name);
150    }
151}