Skip to main content

tansu_cli/
cli.rs

1// Copyright ⓒ 2024-2026 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Tansu CLI
16//!
17//! The CLI is a single statically linked binary that contains:
18//! - Broker
19//! - Cat: produce, validate (if backed by a schema) and fetch messages
20//! - Generator: use fake data generators to produce messages with a rate limit
21//! - Proxy: a Kafka API proxy
22//! - Topic: Topic administration
23
24use std::process;
25
26use crate::Result;
27use clap::{Parser, Subcommand};
28use tansu_sans_io::ErrorCode;
29use tracing::debug;
30
31mod broker;
32mod cat;
33mod generator;
34mod perf;
35mod proxy;
36mod topic;
37mod user;
38
39const DEFAULT_BROKER: &str = "tcp://localhost:9092";
40
41fn storage_engines() -> Vec<&'static str> {
42    vec![
43        #[cfg(feature = "dynostore")]
44        "dynostore",
45        #[cfg(feature = "libsql")]
46        "libsql",
47        #[cfg(feature = "postgres")]
48        "postgres",
49        #[cfg(feature = "slatedb")]
50        "slatedb",
51        #[cfg(feature = "turso")]
52        "turso",
53    ]
54}
55
56fn lakes() -> Vec<&'static str> {
57    vec![
58        #[cfg(feature = "delta")]
59        "delta",
60        #[cfg(feature = "iceberg")]
61        "iceberg",
62    ]
63}
64
65fn after_help() -> String {
66    [
67        format!("Storage engines: {}", storage_engines().join(", ")),
68        format!("Data lakes: {}", lakes().join(", ")),
69    ]
70    .join("\n")
71}
72
73#[derive(Clone, Debug, Parser)]
74#[command(
75    name = "tansu",
76    version,
77    about,
78    long_about = None,
79    after_help = after_help(),
80    args_conflicts_with_subcommands = true
81)]
82pub struct Cli {
83    #[command(subcommand)]
84    command: Option<Command>,
85
86    #[clap(flatten)]
87    broker: broker::Arg,
88}
89
90#[derive(Clone, Debug, Subcommand)]
91enum Command {
92    /// Apache Kafka compatible broker with Avro, JSON, Protobuf schema validation [default if no command supplied]
93    Broker(Box<broker::Arg>),
94
95    /// Easily consume or produce Avro, JSON or Protobuf messages to a topic
96    Cat {
97        #[command(subcommand)]
98        command: cat::Command,
99    },
100
101    /// Traffic Generator for schema backed topics
102    Generator(Box<generator::Arg>),
103
104    /// Performance
105    Perf(Box<perf::Arg>),
106
107    /// Apache Kafka compatible proxy
108    Proxy(Box<proxy::Arg>),
109
110    /// Create, list or delete topics managed by the broker
111    Topic {
112        #[command(subcommand)]
113        command: topic::Command,
114    },
115
116    /// Create, list or delete users managed by the broker
117    User {
118        #[command(subcommand)]
119        command: user::Command,
120    },
121}
122
123impl Cli {
124    pub async fn main() -> Result<ErrorCode> {
125        debug!(
126            pid = process::id(),
127            storage = ?storage_engines(),
128            lakes = ?lakes()
129        );
130
131        let cli = Cli::parse();
132
133        match cli.command.unwrap_or(Command::Broker(Box::new(cli.broker))) {
134            Command::Broker(arg) => arg.main().await,
135            Command::Cat { command } => command.main().await,
136            Command::Generator(arg) => arg.main().await,
137            Command::Perf(arg) => arg.main().await,
138            Command::Proxy(arg) => tansu_proxy::Proxy::main(
139                arg.listener_url.into_inner(),
140                arg.advertised_listener_url.into_inner(),
141                arg.origin_url.into_inner(),
142                arg.otlp_endpoint_url
143                    .map(|otlp_endpoint_url| otlp_endpoint_url.into_inner()),
144            )
145            .await
146            .map_err(Into::into),
147            Command::Topic { command } => command.main().await,
148            Command::User { command } => command.main().await,
149        }
150    }
151}