1use std::process;
25
26use crate::Result;
27use clap::{Parser, Subcommand};
28use tansu_sans_io::ErrorCode;
29use tracing::debug;
30
31mod broker;
32mod cat;
33mod generator;
34mod proxy;
35mod topic;
36
37const DEFAULT_BROKER: &str = "tcp://localhost:9092";
38
39fn storage_engines() -> Vec<&'static str> {
40 vec![
41 #[cfg(feature = "dynostore")]
42 "dynostore",
43 #[cfg(feature = "libsql")]
44 "libsql",
45 #[cfg(feature = "postgres")]
46 "postgres",
47 #[cfg(feature = "turso")]
48 "turso",
49 ]
50}
51
52fn lakes() -> Vec<&'static str> {
53 vec![
54 #[cfg(feature = "delta")]
55 "delta",
56 #[cfg(feature = "iceberg")]
57 "iceberg",
58 ]
59}
60
61fn after_help() -> String {
62 [
63 format!("Storage engines: {}", storage_engines().join(", ")),
64 format!("Data lakes: {}", lakes().join(", ")),
65 ]
66 .join("\n")
67}
68
69#[derive(Clone, Debug, Parser)]
70#[command(
71 name = "tansu",
72 version,
73 about,
74 long_about = None,
75 after_help = after_help(),
76 args_conflicts_with_subcommands = true
77)]
78pub struct Cli {
79 #[command(subcommand)]
80 command: Option<Command>,
81
82 #[clap(flatten)]
83 broker: broker::Arg,
84}
85
86#[derive(Clone, Debug, Subcommand)]
87enum Command {
88 Broker(Box<broker::Arg>),
90
91 Cat {
93 #[command(subcommand)]
94 command: cat::Command,
95 },
96
97 Generator(Box<generator::Arg>),
99
100 Proxy(Box<proxy::Arg>),
102
103 Topic {
105 #[command(subcommand)]
106 command: topic::Command,
107 },
108}
109
110impl Cli {
111 pub async fn main() -> Result<ErrorCode> {
112 debug!(
113 pid = process::id(),
114 storage = ?storage_engines(),
115 lakes = ?lakes()
116 );
117
118 let cli = Cli::parse();
119
120 match cli.command.unwrap_or(Command::Broker(Box::new(cli.broker))) {
121 Command::Broker(arg) => arg
122 .main()
123 .await
124 .inspect(|result| debug!(?result))
125 .inspect_err(|err| debug!(?err)),
126
127 Command::Cat { command } => command.main().await,
128
129 Command::Generator(arg) => arg
130 .main()
131 .await
132 .inspect(|result| debug!(?result))
133 .inspect_err(|err| debug!(?err)),
134
135 Command::Proxy(arg) => tansu_proxy::Proxy::main(
136 arg.listener_url.into_inner(),
137 arg.advertised_listener_url.into_inner(),
138 arg.origin_url.into_inner(),
139 arg.otlp_endpoint_url
140 .map(|otlp_endpoint_url| otlp_endpoint_url.into_inner()),
141 )
142 .await
143 .map_err(Into::into),
144
145 Command::Topic { command } => command.main().await,
146 }
147 }
148}