iggy_common/commands/streams/
update_stream.rs1use super::MAX_NAME_LENGTH;
20use crate::BytesSerializable;
21use crate::Identifier;
22use crate::Sizeable;
23use crate::Validatable;
24use crate::error::IggyError;
25use crate::{Command, UPDATE_STREAM_CODE};
26use bytes::{BufMut, Bytes, BytesMut};
27use serde::{Deserialize, Serialize};
28use std::fmt::Display;
29use std::str::from_utf8;
30
31#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
36pub struct UpdateStream {
37 #[serde(skip)]
39 pub stream_id: Identifier,
40 pub name: String,
42}
43
44impl Command for UpdateStream {
45 fn code(&self) -> u32 {
46 UPDATE_STREAM_CODE
47 }
48}
49
50impl Default for UpdateStream {
51 fn default() -> Self {
52 UpdateStream {
53 stream_id: Identifier::default(),
54 name: "stream".to_string(),
55 }
56 }
57}
58
59impl Validatable<IggyError> for UpdateStream {
60 fn validate(&self) -> Result<(), IggyError> {
61 if self.name.is_empty() || self.name.len() > MAX_NAME_LENGTH {
62 return Err(IggyError::InvalidStreamName);
63 }
64
65 Ok(())
66 }
67}
68
69impl BytesSerializable for UpdateStream {
70 fn to_bytes(&self) -> Bytes {
71 let stream_id_bytes = self.stream_id.to_bytes();
72 let mut bytes = BytesMut::with_capacity(1 + stream_id_bytes.len() + self.name.len());
73 bytes.put_slice(&stream_id_bytes);
74 #[allow(clippy::cast_possible_truncation)]
75 bytes.put_u8(self.name.len() as u8);
76 bytes.put_slice(self.name.as_bytes());
77 bytes.freeze()
78 }
79
80 fn from_bytes(bytes: Bytes) -> std::result::Result<UpdateStream, IggyError> {
81 if bytes.len() < 5 {
82 return Err(IggyError::InvalidCommand);
83 }
84
85 let mut position = 0;
86 let stream_id = Identifier::from_bytes(bytes.clone())?;
87 position += stream_id.get_size_bytes().as_bytes_usize();
88 let name_length = *bytes.get(position).ok_or(IggyError::InvalidCommand)? as usize;
89 let name = from_utf8(
90 bytes
91 .get(position + 1..position + 1 + name_length)
92 .ok_or(IggyError::InvalidCommand)?,
93 )
94 .map_err(|_| IggyError::InvalidUtf8)?
95 .to_string();
96
97 let command = UpdateStream { stream_id, name };
98 Ok(command)
99 }
100}
101
102impl Display for UpdateStream {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 write!(f, "{}|{}", self.stream_id, self.name)
105 }
106}
107
108#[cfg(test)]
109mod tests {
110 use super::*;
111
112 #[test]
113 fn should_be_serialized_as_bytes() {
114 let command = UpdateStream {
115 stream_id: Identifier::numeric(1).unwrap(),
116 name: "test".to_string(),
117 };
118
119 let bytes = command.to_bytes();
120 let mut position = 0;
121 let stream_id = Identifier::from_bytes(bytes.clone()).unwrap();
122 position += stream_id.get_size_bytes().as_bytes_usize();
123 let name_length = bytes[position];
124 let name = from_utf8(&bytes[position + 1..position + 1 + name_length as usize])
125 .unwrap()
126 .to_string();
127
128 assert!(!bytes.is_empty());
129 assert_eq!(stream_id, command.stream_id);
130 assert_eq!(name, command.name);
131 }
132
133 #[test]
134 fn should_be_deserialized_from_bytes() {
135 let stream_id = Identifier::numeric(1).unwrap();
136 let name = "test".to_string();
137
138 let stream_id_bytes = stream_id.to_bytes();
139 let mut bytes = BytesMut::with_capacity(1 + stream_id_bytes.len() + name.len());
140 bytes.put_slice(&stream_id_bytes);
141 #[allow(clippy::cast_possible_truncation)]
142 bytes.put_u8(name.len() as u8);
143 bytes.put_slice(name.as_bytes());
144 let command = UpdateStream::from_bytes(bytes.freeze());
145 assert!(command.is_ok());
146
147 let command = command.unwrap();
148 assert_eq!(command.stream_id, stream_id);
149 assert_eq!(command.name, name);
150 }
151}