Skip to main content

iggy_cli/commands/binary_topics/
create_topic.rs

1/* Licensed to the Apache Software Foundation (ASF) under one
2 * or more contributor license agreements.  See the NOTICE file
3 * distributed with this work for additional information
4 * regarding copyright ownership.  The ASF licenses this file
5 * to you under the Apache License, Version 2.0 (the
6 * "License"); you may not use this file except in compliance
7 * with the License.  You may obtain a copy of the License at
8 *
9 *   http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied.  See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19use crate::commands::cli_command::{CliCommand, PRINT_TARGET};
20use anyhow::Context;
21use async_trait::async_trait;
22use core::fmt;
23use iggy_common::Client;
24use iggy_common::create_topic::CreateTopic;
25use iggy_common::{CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize};
26use tracing::{Level, event};
27
28pub struct CreateTopicCmd {
29    create_topic: CreateTopic,
30    message_expiry: IggyExpiry,
31    max_topic_size: MaxTopicSize,
32    replication_factor: u8,
33}
34
35impl CreateTopicCmd {
36    #[allow(clippy::too_many_arguments)]
37    pub fn new(
38        stream_id: Identifier,
39        partitions_count: u32,
40        compression_algorithm: CompressionAlgorithm,
41        name: String,
42        message_expiry: IggyExpiry,
43        max_topic_size: MaxTopicSize,
44        replication_factor: u8,
45    ) -> Self {
46        Self {
47            create_topic: CreateTopic {
48                stream_id,
49                partitions_count,
50                compression_algorithm,
51                name,
52                message_expiry,
53                max_topic_size,
54                replication_factor: Some(replication_factor),
55            },
56            message_expiry,
57            max_topic_size,
58            replication_factor,
59        }
60    }
61}
62
63#[async_trait]
64impl CliCommand for CreateTopicCmd {
65    fn explain(&self) -> String {
66        format!("{self}")
67    }
68
69    async fn execute_cmd(&mut self, client: &dyn Client) -> anyhow::Result<(), anyhow::Error> {
70        client
71            .create_topic(
72                &self.create_topic.stream_id,
73                &self.create_topic.name,
74                self.create_topic.partitions_count,
75                self.create_topic.compression_algorithm,
76                self.create_topic.replication_factor,
77                self.create_topic.message_expiry,
78                self.create_topic.max_topic_size,
79            )
80            .await
81            .with_context(|| {
82                format!(
83                    "Problem creating topic (name: {}, partitions count: {}) in stream with ID: {}",
84                    self.create_topic.name,
85                    self.create_topic.partitions_count,
86                    self.create_topic.stream_id
87                )
88            })?;
89
90        event!(target: PRINT_TARGET, Level::INFO,
91            "Topic with name: {}, partitions count: {}, compression algorithm: {}, message expiry: {}, max topic size: {}, replication factor: {} created in stream with ID: {}",
92            self.create_topic.name,
93            self.create_topic.partitions_count,
94            self.create_topic.compression_algorithm,
95            self.message_expiry,
96            self.max_topic_size,
97            self.replication_factor,
98            self.create_topic.stream_id,
99        );
100
101        Ok(())
102    }
103}
104
105impl fmt::Display for CreateTopicCmd {
106    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107        let topic_name = &self.create_topic.name;
108        let compression_algorithm = &self.create_topic.compression_algorithm;
109        let message_expiry = &self.message_expiry;
110        let max_topic_size = &self.max_topic_size;
111        let replication_factor = self.replication_factor;
112        let stream_id = &self.create_topic.stream_id;
113
114        write!(
115            f,
116            "create topic with name: {topic_name}, message expiry: {message_expiry}, compression algorithm: {compression_algorithm}, \
117            max topic size: {max_topic_size}, replication factor: {replication_factor} in stream with ID: {stream_id}",
118        )
119    }
120}