Skip to main content

iggy_common/commands/topics/
purge_topic.rs

1/* Licensed to the Apache Software Foundation (ASF) under one
2 * or more contributor license agreements.  See the NOTICE file
3 * distributed with this work for additional information
4 * regarding copyright ownership.  The ASF licenses this file
5 * to you under the Apache License, Version 2.0 (the
6 * "License"); you may not use this file except in compliance
7 * with the License.  You may obtain a copy of the License at
8 *
9 *   http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied.  See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19use crate::BytesSerializable;
20use crate::Identifier;
21use crate::Sizeable;
22use crate::Validatable;
23use crate::error::IggyError;
24use crate::{Command, PURGE_TOPIC_CODE};
25use bytes::{BufMut, Bytes, BytesMut};
26use serde::{Deserialize, Serialize};
27use std::fmt::Display;
28
29/// `PurgeTopic` command is used to purge topic data (its messages in all the partitions) from a stream.
30/// It has additional payload:
31/// - `stream_id` - unique stream ID (numeric or name).
32/// - `topic_id` - unique topic ID (numeric or name).
33#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
34pub struct PurgeTopic {
35    /// Unique stream ID (numeric or name).
36    #[serde(skip)]
37    pub stream_id: Identifier,
38    /// Unique topic ID (numeric or name).
39    #[serde(skip)]
40    pub topic_id: Identifier,
41}
42
43impl Command for PurgeTopic {
44    fn code(&self) -> u32 {
45        PURGE_TOPIC_CODE
46    }
47}
48
49impl Validatable<IggyError> for PurgeTopic {
50    fn validate(&self) -> Result<(), IggyError> {
51        Ok(())
52    }
53}
54
55impl BytesSerializable for PurgeTopic {
56    fn to_bytes(&self) -> Bytes {
57        let stream_id_bytes = self.stream_id.to_bytes();
58        let topic_id_bytes = self.topic_id.to_bytes();
59        let mut bytes = BytesMut::with_capacity(stream_id_bytes.len() + topic_id_bytes.len());
60        bytes.put_slice(&stream_id_bytes);
61        bytes.put_slice(&topic_id_bytes);
62        bytes.freeze()
63    }
64
65    fn from_bytes(bytes: Bytes) -> Result<PurgeTopic, IggyError> {
66        if bytes.len() < 10 {
67            return Err(IggyError::InvalidCommand);
68        }
69
70        let mut position = 0;
71        let stream_id = Identifier::from_bytes(bytes.clone())?;
72        position += stream_id.get_size_bytes().as_bytes_usize();
73        let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
74        let command = PurgeTopic {
75            stream_id,
76            topic_id,
77        };
78        Ok(command)
79    }
80}
81
82impl Display for PurgeTopic {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        write!(f, "{}|{}", self.stream_id, self.topic_id)
85    }
86}
87
88#[cfg(test)]
89mod tests {
90    use super::*;
91
92    #[test]
93    fn should_be_serialized_as_bytes() {
94        let command = PurgeTopic {
95            stream_id: Identifier::numeric(1).unwrap(),
96            topic_id: Identifier::numeric(2).unwrap(),
97        };
98
99        let bytes = command.to_bytes();
100        let mut position = 0;
101        let stream_id = Identifier::from_bytes(bytes.clone()).unwrap();
102        position += stream_id.get_size_bytes().as_bytes_usize();
103        let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap();
104
105        assert!(!bytes.is_empty());
106        assert_eq!(stream_id, command.stream_id);
107        assert_eq!(topic_id, command.topic_id);
108    }
109
110    #[test]
111    fn should_be_deserialized_from_bytes() {
112        let stream_id = Identifier::numeric(1).unwrap();
113        let topic_id = Identifier::numeric(2).unwrap();
114        let mut bytes = BytesMut::new();
115        bytes.put_slice(&stream_id.to_bytes());
116        bytes.put_slice(&topic_id.to_bytes());
117        let command = PurgeTopic::from_bytes(bytes.freeze());
118        assert!(command.is_ok());
119
120        let command = command.unwrap();
121        assert_eq!(command.stream_id, stream_id);
122        assert_eq!(command.topic_id, topic_id);
123    }
124}