1use std::process;
25
26use crate::Result;
27use clap::{Parser, Subcommand};
28use tansu_sans_io::ErrorCode;
29use tracing::debug;
30
31mod broker;
32mod cat;
33mod generator;
34mod perf;
35mod proxy;
36mod topic;
37
38const DEFAULT_BROKER: &str = "tcp://localhost:9092";
39
40fn storage_engines() -> Vec<&'static str> {
41 vec![
42 #[cfg(feature = "dynostore")]
43 "dynostore",
44 #[cfg(feature = "libsql")]
45 "libsql",
46 #[cfg(feature = "postgres")]
47 "postgres",
48 #[cfg(feature = "turso")]
49 "turso",
50 ]
51}
52
53fn lakes() -> Vec<&'static str> {
54 vec![
55 #[cfg(feature = "delta")]
56 "delta",
57 #[cfg(feature = "iceberg")]
58 "iceberg",
59 ]
60}
61
62fn after_help() -> String {
63 [
64 format!("Storage engines: {}", storage_engines().join(", ")),
65 format!("Data lakes: {}", lakes().join(", ")),
66 ]
67 .join("\n")
68}
69
70#[derive(Clone, Debug, Parser)]
71#[command(
72 name = "tansu",
73 version,
74 about,
75 long_about = None,
76 after_help = after_help(),
77 args_conflicts_with_subcommands = true
78)]
79pub struct Cli {
80 #[command(subcommand)]
81 command: Option<Command>,
82
83 #[clap(flatten)]
84 broker: broker::Arg,
85}
86
87#[derive(Clone, Debug, Subcommand)]
88enum Command {
89 Broker(Box<broker::Arg>),
91
92 Cat {
94 #[command(subcommand)]
95 command: cat::Command,
96 },
97
98 Generator(Box<generator::Arg>),
100
101 Perf(Box<perf::Arg>),
103
104 Proxy(Box<proxy::Arg>),
106
107 Topic {
109 #[command(subcommand)]
110 command: topic::Command,
111 },
112}
113
114impl Cli {
115 pub async fn main() -> Result<ErrorCode> {
116 debug!(
117 pid = process::id(),
118 storage = ?storage_engines(),
119 lakes = ?lakes()
120 );
121
122 let cli = Cli::parse();
123
124 match cli.command.unwrap_or(Command::Broker(Box::new(cli.broker))) {
125 Command::Broker(arg) => arg
126 .main()
127 .await
128 .inspect(|result| debug!(?result))
129 .inspect_err(|err| debug!(?err)),
130
131 Command::Cat { command } => command.main().await,
132
133 Command::Generator(arg) => arg
134 .main()
135 .await
136 .inspect(|result| debug!(?result))
137 .inspect_err(|err| debug!(?err)),
138
139 Command::Perf(arg) => arg
140 .main()
141 .await
142 .inspect(|result| debug!(?result))
143 .inspect_err(|err| debug!(?err)),
144
145 Command::Proxy(arg) => tansu_proxy::Proxy::main(
146 arg.listener_url.into_inner(),
147 arg.advertised_listener_url.into_inner(),
148 arg.origin_url.into_inner(),
149 arg.otlp_endpoint_url
150 .map(|otlp_endpoint_url| otlp_endpoint_url.into_inner()),
151 )
152 .await
153 .map_err(Into::into),
154
155 Command::Topic { command } => command.main().await,
156 }
157 }
158}