tansu_cli/
cli.rs

1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Tansu CLI
16//!
17//! The CLI is a single statically linked binary that contains:
18//! - Broker
19//! - Cat: produce, validate (if backed by a schema) and fetch messages
20//! - Generator: use fake data generators to produce messages with a rate limit
21//! - Proxy: a Kafka API proxy
22//! - Topic: Topic administration
23
24use std::process;
25
26use crate::Result;
27use clap::{Parser, Subcommand};
28use tansu_sans_io::ErrorCode;
29use tracing::debug;
30
31mod broker;
32mod cat;
33mod generator;
34mod perf;
35mod proxy;
36mod topic;
37
38const DEFAULT_BROKER: &str = "tcp://localhost:9092";
39
40fn storage_engines() -> Vec<&'static str> {
41    vec![
42        #[cfg(feature = "dynostore")]
43        "dynostore",
44        #[cfg(feature = "libsql")]
45        "libsql",
46        #[cfg(feature = "postgres")]
47        "postgres",
48        #[cfg(feature = "turso")]
49        "turso",
50    ]
51}
52
53fn lakes() -> Vec<&'static str> {
54    vec![
55        #[cfg(feature = "delta")]
56        "delta",
57        #[cfg(feature = "iceberg")]
58        "iceberg",
59    ]
60}
61
62fn after_help() -> String {
63    [
64        format!("Storage engines: {}", storage_engines().join(", ")),
65        format!("Data lakes: {}", lakes().join(", ")),
66    ]
67    .join("\n")
68}
69
70#[derive(Clone, Debug, Parser)]
71#[command(
72    name = "tansu",
73    version,
74    about,
75    long_about = None,
76    after_help = after_help(),
77    args_conflicts_with_subcommands = true
78)]
79pub struct Cli {
80    #[command(subcommand)]
81    command: Option<Command>,
82
83    #[clap(flatten)]
84    broker: broker::Arg,
85}
86
87#[derive(Clone, Debug, Subcommand)]
88enum Command {
89    /// Apache Kafka compatible broker with Avro, JSON, Protobuf schema validation [default if no command supplied]
90    Broker(Box<broker::Arg>),
91
92    /// Easily consume or produce Avro, JSON or Protobuf messages to a topic
93    Cat {
94        #[command(subcommand)]
95        command: cat::Command,
96    },
97
98    /// Traffic Generator for schema backed topics
99    Generator(Box<generator::Arg>),
100
101    /// Performance
102    Perf(Box<perf::Arg>),
103
104    /// Apache Kafka compatible proxy
105    Proxy(Box<proxy::Arg>),
106
107    /// Create, list or delete topics managed by the broker
108    Topic {
109        #[command(subcommand)]
110        command: topic::Command,
111    },
112}
113
114impl Cli {
115    pub async fn main() -> Result<ErrorCode> {
116        debug!(
117            pid = process::id(),
118            storage = ?storage_engines(),
119            lakes = ?lakes()
120        );
121
122        let cli = Cli::parse();
123
124        match cli.command.unwrap_or(Command::Broker(Box::new(cli.broker))) {
125            Command::Broker(arg) => arg
126                .main()
127                .await
128                .inspect(|result| debug!(?result))
129                .inspect_err(|err| debug!(?err)),
130
131            Command::Cat { command } => command.main().await,
132
133            Command::Generator(arg) => arg
134                .main()
135                .await
136                .inspect(|result| debug!(?result))
137                .inspect_err(|err| debug!(?err)),
138
139            Command::Perf(arg) => arg
140                .main()
141                .await
142                .inspect(|result| debug!(?result))
143                .inspect_err(|err| debug!(?err)),
144
145            Command::Proxy(arg) => tansu_proxy::Proxy::main(
146                arg.listener_url.into_inner(),
147                arg.advertised_listener_url.into_inner(),
148                arg.origin_url.into_inner(),
149                arg.otlp_endpoint_url
150                    .map(|otlp_endpoint_url| otlp_endpoint_url.into_inner()),
151            )
152            .await
153            .map_err(Into::into),
154
155            Command::Topic { command } => command.main().await,
156        }
157    }
158}