Skip to main content

iggy_cli/commands/binary_topics/
get_topics.rs

1/* Licensed to the Apache Software Foundation (ASF) under one
2 * or more contributor license agreements.  See the NOTICE file
3 * distributed with this work for additional information
4 * regarding copyright ownership.  The ASF licenses this file
5 * to you under the Apache License, Version 2.0 (the
6 * "License"); you may not use this file except in compliance
7 * with the License.  You may obtain a copy of the License at
8 *
9 *   http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied.  See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19use crate::commands::cli_command::{CliCommand, PRINT_TARGET};
20use anyhow::Context;
21use async_trait::async_trait;
22use comfy_table::Table;
23use iggy_common::Client;
24use iggy_common::{Identifier, IggyExpiry};
25use std::fmt::{self, Display, Formatter};
26use tracing::{Level, event};
27
28pub enum GetTopicsOutput {
29    Table,
30    List,
31}
32
33impl Display for GetTopicsOutput {
34    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
35        match self {
36            GetTopicsOutput::Table => write!(f, "table"),
37            GetTopicsOutput::List => write!(f, "list"),
38        }?;
39
40        Ok(())
41    }
42}
43
44pub struct GetTopicsCmd {
45    stream_id: Identifier,
46    output: GetTopicsOutput,
47}
48
49impl GetTopicsCmd {
50    pub fn new(stream_id: Identifier, output: GetTopicsOutput) -> Self {
51        Self { stream_id, output }
52    }
53}
54
55#[async_trait]
56impl CliCommand for GetTopicsCmd {
57    fn explain(&self) -> String {
58        format!(
59            "list topics from stream with ID: {} in {} mode",
60            self.stream_id, self.output
61        )
62    }
63
64    async fn execute_cmd(&mut self, client: &dyn Client) -> anyhow::Result<(), anyhow::Error> {
65        let topics = client
66            .get_topics(&self.stream_id)
67            .await
68            .with_context(|| format!("Problem getting topics from stream {}", self.stream_id))?;
69
70        match self.output {
71            GetTopicsOutput::Table => {
72                let mut table = Table::new();
73
74                table.set_header(vec![
75                    "ID",
76                    "Created",
77                    "Name",
78                    "Size (B)",
79                    "Max Topic Size (B)",
80                    "Compression",
81                    "Message Expiry (s)",
82                    "Messages Count",
83                    "Partitions Count",
84                ]);
85
86                topics.iter().for_each(|topic| {
87                    table.add_row(vec![
88                        format!("{}", topic.id),
89                        topic.created_at.to_utc_string("%Y-%m-%d %H:%M:%S"),
90                        topic.name.clone(),
91                        format!("{}", topic.size),
92                        format!("{}", topic.max_topic_size),
93                        topic.compression_algorithm.to_string(),
94                        match topic.message_expiry {
95                            IggyExpiry::NeverExpire => String::from("unlimited"),
96                            IggyExpiry::ServerDefault => String::from("server_default"),
97                            IggyExpiry::ExpireDuration(value) => format!("{value}"),
98                        },
99                        format!("{}", topic.messages_count),
100                        format!("{}", topic.partitions_count),
101                    ]);
102                });
103
104                event!(target: PRINT_TARGET, Level::INFO, "{table}");
105            }
106            GetTopicsOutput::List => {
107                topics.iter().for_each(|topic| {
108                    event!(target: PRINT_TARGET, Level::INFO,
109                            "{}|{}|{}|{}|{}|{}|{}|{}|{}",
110                            topic.id,
111                            topic.created_at.to_utc_string("%Y-%m-%d %H:%M:%S"),
112                            topic.name,
113                            topic.size,
114                            topic.max_topic_size,
115                            topic.compression_algorithm.to_string(),
116                            match topic.message_expiry {
117                    IggyExpiry::NeverExpire => String::from("unlimited"),
118                    IggyExpiry::ServerDefault => String::from("server_default"),
119                    IggyExpiry::ExpireDuration(value) => format!("{value}"),
120                            },
121                            topic.messages_count,
122                            topic.partitions_count
123                        );
124                });
125            }
126        }
127
128        Ok(())
129    }
130}