iggy_cli/commands/binary_topics/
create_topic.rs1use crate::commands::cli_command::{CliCommand, PRINT_TARGET};
20use anyhow::Context;
21use async_trait::async_trait;
22use core::fmt;
23use iggy_common::Client;
24use iggy_common::create_topic::CreateTopic;
25use iggy_common::{CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize};
26use tracing::{Level, event};
27
28pub struct CreateTopicCmd {
29 create_topic: CreateTopic,
30 message_expiry: IggyExpiry,
31 max_topic_size: MaxTopicSize,
32 replication_factor: u8,
33}
34
35impl CreateTopicCmd {
36 #[allow(clippy::too_many_arguments)]
37 pub fn new(
38 stream_id: Identifier,
39 partitions_count: u32,
40 compression_algorithm: CompressionAlgorithm,
41 name: String,
42 message_expiry: IggyExpiry,
43 max_topic_size: MaxTopicSize,
44 replication_factor: u8,
45 ) -> Self {
46 Self {
47 create_topic: CreateTopic {
48 stream_id,
49 partitions_count,
50 compression_algorithm,
51 name,
52 message_expiry,
53 max_topic_size,
54 replication_factor: Some(replication_factor),
55 },
56 message_expiry,
57 max_topic_size,
58 replication_factor,
59 }
60 }
61}
62
63#[async_trait]
64impl CliCommand for CreateTopicCmd {
65 fn explain(&self) -> String {
66 format!("{self}")
67 }
68
69 async fn execute_cmd(&mut self, client: &dyn Client) -> anyhow::Result<(), anyhow::Error> {
70 client
71 .create_topic(
72 &self.create_topic.stream_id,
73 &self.create_topic.name,
74 self.create_topic.partitions_count,
75 self.create_topic.compression_algorithm,
76 self.create_topic.replication_factor,
77 self.create_topic.message_expiry,
78 self.create_topic.max_topic_size,
79 )
80 .await
81 .with_context(|| {
82 format!(
83 "Problem creating topic (name: {}, partitions count: {}) in stream with ID: {}",
84 self.create_topic.name,
85 self.create_topic.partitions_count,
86 self.create_topic.stream_id
87 )
88 })?;
89
90 event!(target: PRINT_TARGET, Level::INFO,
91 "Topic with name: {}, partitions count: {}, compression algorithm: {}, message expiry: {}, max topic size: {}, replication factor: {} created in stream with ID: {}",
92 self.create_topic.name,
93 self.create_topic.partitions_count,
94 self.create_topic.compression_algorithm,
95 self.message_expiry,
96 self.max_topic_size,
97 self.replication_factor,
98 self.create_topic.stream_id,
99 );
100
101 Ok(())
102 }
103}
104
105impl fmt::Display for CreateTopicCmd {
106 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107 let topic_name = &self.create_topic.name;
108 let compression_algorithm = &self.create_topic.compression_algorithm;
109 let message_expiry = &self.message_expiry;
110 let max_topic_size = &self.max_topic_size;
111 let replication_factor = self.replication_factor;
112 let stream_id = &self.create_topic.stream_id;
113
114 write!(
115 f,
116 "create topic with name: {topic_name}, message expiry: {message_expiry}, compression algorithm: {compression_algorithm}, \
117 max topic size: {max_topic_size}, replication factor: {replication_factor} in stream with ID: {stream_id}",
118 )
119 }
120}