tansu_cli/
cli.rs

1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Tansu CLI
16//!
17//! The CLI is a single statically linked binary that contains:
18//! - Broker
19//! - Cat: produce, validate (if backed by a schema) and fetch messages
20//! - Generator: use fake data generators to produce messages with a rate limit
21//! - Proxy: a Kafka API proxy
22//! - Topic: Topic administration
23
24use std::process;
25
26use crate::Result;
27use clap::{Parser, Subcommand};
28use tansu_sans_io::ErrorCode;
29use tracing::debug;
30
31mod broker;
32mod cat;
33mod generator;
34mod perf;
35mod proxy;
36mod topic;
37
38const DEFAULT_BROKER: &str = "tcp://localhost:9092";
39
40fn storage_engines() -> Vec<&'static str> {
41    vec![
42        #[cfg(feature = "dynostore")]
43        "dynostore",
44        #[cfg(feature = "libsql")]
45        "libsql",
46        #[cfg(feature = "postgres")]
47        "postgres",
48        #[cfg(feature = "slatedb")]
49        "slatedb",
50        #[cfg(feature = "turso")]
51        "turso",
52    ]
53}
54
55fn lakes() -> Vec<&'static str> {
56    vec![
57        #[cfg(feature = "delta")]
58        "delta",
59        #[cfg(feature = "iceberg")]
60        "iceberg",
61    ]
62}
63
64fn after_help() -> String {
65    [
66        format!("Storage engines: {}", storage_engines().join(", ")),
67        format!("Data lakes: {}", lakes().join(", ")),
68    ]
69    .join("\n")
70}
71
72#[derive(Clone, Debug, Parser)]
73#[command(
74    name = "tansu",
75    version,
76    about,
77    long_about = None,
78    after_help = after_help(),
79    args_conflicts_with_subcommands = true
80)]
81pub struct Cli {
82    #[command(subcommand)]
83    command: Option<Command>,
84
85    #[clap(flatten)]
86    broker: broker::Arg,
87}
88
89#[derive(Clone, Debug, Subcommand)]
90enum Command {
91    /// Apache Kafka compatible broker with Avro, JSON, Protobuf schema validation [default if no command supplied]
92    Broker(Box<broker::Arg>),
93
94    /// Easily consume or produce Avro, JSON or Protobuf messages to a topic
95    Cat {
96        #[command(subcommand)]
97        command: cat::Command,
98    },
99
100    /// Traffic Generator for schema backed topics
101    Generator(Box<generator::Arg>),
102
103    /// Performance
104    Perf(Box<perf::Arg>),
105
106    /// Apache Kafka compatible proxy
107    Proxy(Box<proxy::Arg>),
108
109    /// Create, list or delete topics managed by the broker
110    Topic {
111        #[command(subcommand)]
112        command: topic::Command,
113    },
114}
115
116impl Cli {
117    pub async fn main() -> Result<ErrorCode> {
118        debug!(
119            pid = process::id(),
120            storage = ?storage_engines(),
121            lakes = ?lakes()
122        );
123
124        let cli = Cli::parse();
125
126        match cli.command.unwrap_or(Command::Broker(Box::new(cli.broker))) {
127            Command::Broker(arg) => arg.main().await,
128            Command::Cat { command } => command.main().await,
129            Command::Generator(arg) => arg.main().await,
130            Command::Perf(arg) => arg.main().await,
131            Command::Proxy(arg) => tansu_proxy::Proxy::main(
132                arg.listener_url.into_inner(),
133                arg.advertised_listener_url.into_inner(),
134                arg.origin_url.into_inner(),
135                arg.otlp_endpoint_url
136                    .map(|otlp_endpoint_url| otlp_endpoint_url.into_inner()),
137            )
138            .await
139            .map_err(Into::into),
140            Command::Topic { command } => command.main().await,
141        }
142    }
143}