tansu_cli/
cli.rs

1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Tansu CLI
16//!
17//! The CLI is a single statically linked binary that contains:
18//! - Broker
19//! - Cat: produce, validate (if backed by a schema) and fetch messages
20//! - Generator: use fake data generators to produce messages with a rate limit
21//! - Proxy: a Kafka API proxy
22//! - Topic: Topic administration
23
24use std::process;
25
26use crate::Result;
27use clap::{Parser, Subcommand};
28use tansu_sans_io::ErrorCode;
29use tracing::debug;
30
31mod broker;
32mod cat;
33mod generator;
34mod proxy;
35mod topic;
36
37const DEFAULT_BROKER: &str = "tcp://localhost:9092";
38
39fn storage_engines() -> Vec<&'static str> {
40    vec![
41        #[cfg(feature = "dynostore")]
42        "dynostore",
43        #[cfg(feature = "libsql")]
44        "libsql",
45        #[cfg(feature = "postgres")]
46        "postgres",
47        #[cfg(feature = "turso")]
48        "turso",
49    ]
50}
51
52fn lakes() -> Vec<&'static str> {
53    vec![
54        #[cfg(feature = "delta")]
55        "delta",
56        #[cfg(feature = "iceberg")]
57        "iceberg",
58    ]
59}
60
61fn after_help() -> String {
62    [
63        format!("Storage engines: {}", storage_engines().join(", ")),
64        format!("Data lakes: {}", lakes().join(", ")),
65    ]
66    .join("\n")
67}
68
69#[derive(Clone, Debug, Parser)]
70#[command(
71    name = "tansu",
72    version,
73    about,
74    long_about = None,
75    after_help = after_help(),
76    args_conflicts_with_subcommands = true
77)]
78pub struct Cli {
79    #[command(subcommand)]
80    command: Option<Command>,
81
82    #[clap(flatten)]
83    broker: broker::Arg,
84}
85
86#[derive(Clone, Debug, Subcommand)]
87enum Command {
88    /// Apache Kafka compatible broker with Avro, JSON, Protobuf schema validation [default if no command supplied]
89    Broker(Box<broker::Arg>),
90
91    /// Easily consume or produce Avro, JSON or Protobuf messages to a topic
92    Cat {
93        #[command(subcommand)]
94        command: cat::Command,
95    },
96
97    /// Traffic Generator for schema backed topics
98    Generator(Box<generator::Arg>),
99
100    /// Apache Kafka compatible proxy
101    Proxy(Box<proxy::Arg>),
102
103    /// Create or delete topics managed by the broker
104    Topic {
105        #[command(subcommand)]
106        command: topic::Command,
107    },
108}
109
110impl Cli {
111    pub async fn main() -> Result<ErrorCode> {
112        debug!(
113            pid = process::id(),
114            storage = ?storage_engines(),
115            lakes = ?lakes()
116        );
117
118        let cli = Cli::parse();
119
120        match cli.command.unwrap_or(Command::Broker(Box::new(cli.broker))) {
121            Command::Broker(arg) => arg
122                .main()
123                .await
124                .inspect(|result| debug!(?result))
125                .inspect_err(|err| debug!(?err)),
126
127            Command::Cat { command } => command.main().await,
128
129            Command::Generator(arg) => arg
130                .main()
131                .await
132                .inspect(|result| debug!(?result))
133                .inspect_err(|err| debug!(?err)),
134
135            Command::Proxy(arg) => tansu_proxy::Proxy::main(
136                arg.listener_url.into_inner(),
137                arg.advertised_listener_url.into_inner(),
138                arg.origin_url.into_inner(),
139                arg.otlp_endpoint_url
140                    .map(|otlp_endpoint_url| otlp_endpoint_url.into_inner()),
141            )
142            .await
143            .map_err(Into::into),
144
145            Command::Topic { command } => command.main().await,
146        }
147    }
148}