iggy_common/commands/topics/
purge_topic.rs1use crate::BytesSerializable;
20use crate::Identifier;
21use crate::Sizeable;
22use crate::Validatable;
23use crate::error::IggyError;
24use crate::{Command, PURGE_TOPIC_CODE};
25use bytes::{BufMut, Bytes, BytesMut};
26use serde::{Deserialize, Serialize};
27use std::fmt::Display;
28
29#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
34pub struct PurgeTopic {
35 #[serde(skip)]
37 pub stream_id: Identifier,
38 #[serde(skip)]
40 pub topic_id: Identifier,
41}
42
43impl Command for PurgeTopic {
44 fn code(&self) -> u32 {
45 PURGE_TOPIC_CODE
46 }
47}
48
49impl Validatable<IggyError> for PurgeTopic {
50 fn validate(&self) -> Result<(), IggyError> {
51 Ok(())
52 }
53}
54
55impl BytesSerializable for PurgeTopic {
56 fn to_bytes(&self) -> Bytes {
57 let stream_id_bytes = self.stream_id.to_bytes();
58 let topic_id_bytes = self.topic_id.to_bytes();
59 let mut bytes = BytesMut::with_capacity(stream_id_bytes.len() + topic_id_bytes.len());
60 bytes.put_slice(&stream_id_bytes);
61 bytes.put_slice(&topic_id_bytes);
62 bytes.freeze()
63 }
64
65 fn from_bytes(bytes: Bytes) -> Result<PurgeTopic, IggyError> {
66 if bytes.len() < 10 {
67 return Err(IggyError::InvalidCommand);
68 }
69
70 let mut position = 0;
71 let stream_id = Identifier::from_bytes(bytes.clone())?;
72 position += stream_id.get_size_bytes().as_bytes_usize();
73 let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
74 let command = PurgeTopic {
75 stream_id,
76 topic_id,
77 };
78 Ok(command)
79 }
80}
81
82impl Display for PurgeTopic {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 write!(f, "{}|{}", self.stream_id, self.topic_id)
85 }
86}
87
88#[cfg(test)]
89mod tests {
90 use super::*;
91
92 #[test]
93 fn should_be_serialized_as_bytes() {
94 let command = PurgeTopic {
95 stream_id: Identifier::numeric(1).unwrap(),
96 topic_id: Identifier::numeric(2).unwrap(),
97 };
98
99 let bytes = command.to_bytes();
100 let mut position = 0;
101 let stream_id = Identifier::from_bytes(bytes.clone()).unwrap();
102 position += stream_id.get_size_bytes().as_bytes_usize();
103 let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap();
104
105 assert!(!bytes.is_empty());
106 assert_eq!(stream_id, command.stream_id);
107 assert_eq!(topic_id, command.topic_id);
108 }
109
110 #[test]
111 fn should_be_deserialized_from_bytes() {
112 let stream_id = Identifier::numeric(1).unwrap();
113 let topic_id = Identifier::numeric(2).unwrap();
114 let mut bytes = BytesMut::new();
115 bytes.put_slice(&stream_id.to_bytes());
116 bytes.put_slice(&topic_id.to_bytes());
117 let command = PurgeTopic::from_bytes(bytes.freeze());
118 assert!(command.is_ok());
119
120 let command = command.unwrap();
121 assert_eq!(command.stream_id, stream_id);
122 assert_eq!(command.topic_id, topic_id);
123 }
124}