1use crate::client::run_client;
2use crate::commands::{Commands, ServiceAction};
3pub use crate::config::{ClientConfig, ClientOpts};
4use crate::ping;
5use crate::service::{create_service_manager, ServiceStatus};
6use crate::shell::get_cache_dir;
7use anyhow::{bail, Context, Result};
8use clap::Parser;
9use cloudpub_common::logging::{init_log, WorkerGuard};
10use cloudpub_common::protocol::message::Message;
11use cloudpub_common::protocol::{
12 ConnectState, EndpointClear, EndpointList, EndpointRemove, EndpointStart, EndpointStartAll,
13 EndpointStop, ErrorKind, PerformUpgrade, Stop,
14};
15use cloudpub_common::{LONG_VERSION, VERSION};
16use dirs::cache_dir;
17use futures::future::FutureExt;
18use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
19use parking_lot::RwLock;
20use std::collections::HashMap;
21use std::io::{self, IsTerminal, Write};
22use std::path::PathBuf;
23use std::sync::Arc;
24use tokio::sync::mpsc;
25use tracing::{debug, error, warn};
26
27const CONFIG_FILE: &str = "client.toml";
28
29#[derive(Parser, Debug)]
30#[command(about, version(VERSION), long_version(LONG_VERSION))]
31pub struct Cli {
32 #[clap(subcommand)]
33 pub command: Commands,
34 #[clap(short, long, default_value = "debug", help = "Log level")]
35 pub log_level: String,
36 #[clap(short, long, default_value = "false", help = "Ouput log to console")]
37 pub verbose: bool,
38 #[clap(short, long, help = "Path to the config file")]
39 pub conf: Option<String>,
40 #[clap(short, long, default_value = "false", help = "Read-only config mode")]
41 pub readonly: bool,
42}
43
44fn handle_service_command(action: &ServiceAction, config: &ClientConfig) -> Result<()> {
45 let config_path = match action {
47 ServiceAction::Install { conf: Some(path) } => {
48 PathBuf::from(path)
50 }
51 _ => {
52 config.get_config_path().to_owned()
54 }
55 };
56
57 let actual_config = if config.token.is_some() {
59 debug!("Service config has token: {:?}", config_path);
60 config_path.clone()
61 } else {
62 #[cfg(any(target_os = "linux", target_os = "macos"))]
63 {
64 use crate::service::ServiceConfig;
65 if let Some(sudo_config) = ServiceConfig::get_sudo_user_config_path() {
67 sudo_config.clone()
68 } else {
69 config_path.clone()
70 }
71 }
72 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
73 config_path.clone()
74 };
75
76 if actual_config.as_os_str().is_empty() || !actual_config.exists() {
77 bail!(crate::t!("error-config-not-found", "path" => format!("{:?}", config_path)));
78 }
79
80 let cfg = ClientConfig::from_file(&actual_config, true)
81 .context(crate::t!("error-config-load-failed", "path" => format!("{:?}", actual_config)))?;
82
83 if cfg.token.is_none() {
84 bail!(crate::t!("error-config-token-missing"));
85 }
86 let service_manager = create_service_manager(actual_config);
88
89 match action {
90 ServiceAction::Install { .. } => {
91 service_manager.install()?;
92 println!("{}", crate::t!("service-installed"));
93 }
94 ServiceAction::Uninstall => {
95 service_manager.uninstall()?;
96 println!("{}", crate::t!("service-uninstalled"));
97 }
98 ServiceAction::Start => {
99 service_manager.start()?;
100 println!("{}", crate::t!("service-started"));
101 }
102 ServiceAction::Stop => {
103 service_manager.stop()?;
104 println!("{}", crate::t!("service-stopped-service"));
105 }
106 ServiceAction::Status => {
107 let status = service_manager.status()?;
108 match status {
109 ServiceStatus::Running => println!("{}", crate::t!("service-running")),
110 ServiceStatus::Stopped => println!("{}", crate::t!("service-stopped-status")),
111 ServiceStatus::NotInstalled => println!("{}", crate::t!("service-not-installed")),
112 ServiceStatus::Unknown => println!("{}", crate::t!("service-status-unknown")),
113 }
114 }
115 }
116
117 Ok(())
118}
119
120pub fn init(args: &Cli) -> Result<(WorkerGuard, Arc<RwLock<ClientConfig>>)> {
121 if let Err(err) = fdlimit::raise_fd_limit() {
123 warn!("Failed to raise file descriptor limit: {}", err);
124 }
125
126 let log_dir = cache_dir().context("Can't get cache dir")?.join("cloudpub");
128 std::fs::create_dir_all(&log_dir).context("Can't create log dir")?;
129
130 let log_file = log_dir.join("client.log");
131
132 let guard = init_log(
133 &args.log_level,
134 &log_file,
135 args.verbose,
136 10 * 1024 * 1024,
137 2,
138 )
139 .context("Failed to initialize logging")?;
140
141 let config = if let Some(path) = args.conf.as_ref() {
142 ClientConfig::from_file(&path.into(), args.readonly)?
143 } else {
144 ClientConfig::load(CONFIG_FILE, true, args.readonly)?
145 };
146 let config = Arc::new(RwLock::new(config));
147 Ok((guard, config))
148}
149
150#[tokio::main]
151pub async fn cli_main(cli: Cli, config: Arc<RwLock<ClientConfig>>) -> Result<()> {
152 ctrlc::set_handler(move || {
153 std::process::exit(1);
154 })
155 .context("Error setting Ctrl-C handler")?;
156
157 let (command_tx, command_rx) = mpsc::channel(1024);
158 main_loop(cli, config, command_tx, command_rx).await
159}
160
161fn make_spinner(msg: String) -> ProgressBar {
162 let spinner = ProgressBar::new_spinner();
163 let style = ProgressStyle::default_spinner()
164 .template("{spinner} {msg}")
165 .unwrap();
166 #[cfg(target_os = "windows")]
167 let style = style.tick_chars("-\\|/ ");
168 spinner.set_style(style);
169 spinner.set_message(msg);
170 #[cfg(unix)]
171 spinner.enable_steady_tick(std::time::Duration::from_millis(100));
172 spinner
173}
174
175pub async fn main_loop(
176 mut cli: Cli,
177 config: Arc<RwLock<ClientConfig>>,
178 command_tx: mpsc::Sender<Message>,
179 command_rx: mpsc::Receiver<Message>,
180) -> Result<()> {
181 let mut opts = ClientOpts::default();
182 let write_stdout = |res: String| {
183 println!("{}", res);
184 };
185
186 let write_stderr = |res: String| {
187 eprintln!("{}", res);
188 };
189
190 let (result_tx, mut result_rx) = mpsc::channel(1024);
191
192 let mut pings = 1;
193
194 opts.secondary = match &mut cli.command {
195 Commands::Set(set_args) => {
196 config.write().set(&set_args.key, &set_args.value)?;
197 return Ok(());
198 }
199 Commands::Get(get_args) => {
200 let value = config.read().get(&get_args.key)?;
201 write_stdout(value);
202 return Ok(());
203 }
204 Commands::Options => {
205 let options = config.read().get_all_options();
206 let mut output = String::new();
207 for (key, value) in options {
208 output.push_str(&format!("{}: {}\n", key, value));
209 }
210 write_stdout(output.trim_end().to_string());
211 return Ok(());
212 }
213 Commands::Purge => {
214 let cache_dir = get_cache_dir("")?;
215 debug!(
216 "{}",
217 crate::t!("purge-cache-dir", "path" => cache_dir.to_str().unwrap())
218 );
219 std::fs::remove_dir_all(&cache_dir).ok();
220 return Ok(());
221 }
222 Commands::Login(args) => {
223 let email = match &args.email {
224 Some(email) => email.clone(),
225 None => {
226 print!("{}", crate::t!("enter-email"));
228 std::io::stdout().flush().ok();
229 let mut email = String::new();
230 std::io::stdin().read_line(&mut email)?;
231 email.trim().to_string()
232 }
233 };
234
235 let password = match &args.password {
236 Some(pwd) => pwd.clone(),
237 None => {
238 if let Ok(pwd) = std::env::var("PASSWORD") {
240 pwd
241 } else {
242 print!("{}", crate::t!("enter-password"));
244 std::io::stdout().flush().ok();
245 rpassword::read_password().unwrap_or_default()
246 }
247 }
248 };
249 opts.credentials = Some((email, password));
250 true
251 }
252 Commands::Logout => {
253 config.write().token = None;
254 config
255 .write()
256 .save()
257 .context("Failed to save config after logout")?;
258 write_stderr(crate::t!("session-terminated"));
259 return Ok(());
260 }
261 Commands::Register(publish_args) => {
262 config.read().validate()?;
263 publish_args.parse()?;
264 true
265 }
266 Commands::Publish(publish_args) => {
267 config.read().validate()?;
268 publish_args.parse()?;
269 false
270 }
271
272 Commands::Ping(_) => {
273 opts.transient = true;
274 true
275 }
276
277 Commands::Run { run_as_service } => {
278 opts.is_service = *run_as_service;
279 false
280 }
281
282 Commands::Unpublish(_)
283 | Commands::Start(_)
284 | Commands::Stop(_)
285 | Commands::Break
286 | Commands::Ls
287 | Commands::Clean
288 | Commands::Upgrade => {
289 config.read().validate()?;
290 true
291 }
292 Commands::Service { action } => {
293 return handle_service_command(action, &config.read());
294 }
295 };
296
297 debug!("Config: {:?}", config);
298
299 tokio::spawn(async move {
300 if let Err(err) = run_client(config.clone(), opts, command_rx, result_tx)
301 .boxed()
302 .await
303 {
304 error!("Error running client: {:?}", err);
305 }
306 });
307
308 let mut current_spinner = None;
309 let multi_progress = MultiProgress::new();
310 let mut progress_bars: HashMap<String, ProgressBar> = HashMap::new();
311
312 loop {
313 match result_rx
314 .recv()
315 .await
316 .context("Failed to receive message")?
317 {
318 Message::Error(err) => {
319 let kind: ErrorKind = err.kind.try_into().unwrap_or(ErrorKind::Fatal);
320 if kind == ErrorKind::Fatal || kind == ErrorKind::AuthFailed {
321 command_tx.send(Message::Stop(Stop {})).await.ok();
322 bail!("{}", err.message);
323 } else {
324 write_stderr(err.message.to_string());
325 }
326 }
327
328 Message::UpgradeAvailable(info) => match cli.command {
329 Commands::Upgrade => {
330 command_tx
331 .send(Message::PerformUpgrade(PerformUpgrade {
332 version: info.version.clone(),
333 }))
334 .await
335 .context("Failed to send upgrade message")?;
336 }
337 Commands::Run { .. } | Commands::Publish(_) => {
338 write_stderr(crate::t!("upgrade-available", "version" => info.version.clone()));
339 }
340 _ => {}
341 },
342
343 Message::EndpointAck(endpoint) => {
344 if endpoint.status == Some("online".to_string()) {
345 match cli.command {
346 Commands::Ping(ref args) => {
347 current_spinner = Some(make_spinner(crate::t!("measuring-speed")));
348 let stats = ping::ping_test(endpoint, args.bare).await?;
349 current_spinner.take();
350 if args.bare {
351 write_stdout(stats.to_string());
352 } else {
353 write_stdout(stats);
354 }
355 pings -= 1;
356 if pings == 0 {
357 break;
358 }
359 }
360 Commands::Register(_) => {
361 write_stdout(
362 crate::t!("service-registered", "endpoint" => endpoint.to_string()),
363 );
364 break;
365 }
366 Commands::Publish(_) | Commands::Run { .. } => {
367 if endpoint.error.is_empty() {
368 write_stdout(
369 crate::t!("service-published", "endpoint" => endpoint.to_string()),
370 )
371 } else {
372 write_stdout(
373 crate::t!("service-error", "endpoint" => endpoint.to_string()),
374 )
375 }
376 }
377 _ => {}
378 }
379 }
380 }
381
382 Message::EndpointStopAck(ep) => {
383 write_stdout(crate::t!("service-stopped", "guid" => ep.guid));
384 if matches!(cli.command, Commands::Unpublish(_)) {
385 break;
386 }
387 }
388
389 Message::EndpointRemoveAck(ep) => {
390 write_stdout(crate::t!("service-removed", "guid" => ep.guid));
391 if matches!(cli.command, Commands::Unpublish(_)) {
392 break;
393 }
394 }
395
396 Message::ConnectState(st) => match st.try_into().unwrap_or(ConnectState::Connecting) {
397 ConnectState::Connecting => {
398 current_spinner = Some(make_spinner(crate::t!("connecting")));
399 }
400
401 ConnectState::Connected => {
402 if let Some(spinner) = current_spinner.take() {
403 spinner.finish_and_clear();
404 }
405
406 match cli.command {
407 Commands::Ls => {
408 command_tx
409 .send(Message::EndpointList(EndpointList {}))
410 .await?;
411 }
412 Commands::Clean => {
413 command_tx
414 .send(Message::EndpointClear(EndpointClear {}))
415 .await?;
416 }
417 Commands::Run { .. } => {
418 command_tx
419 .send(Message::EndpointStartAll(EndpointStartAll {}))
420 .await?;
421 }
422 Commands::Publish(ref endpoint) => {
423 command_tx
424 .send(Message::EndpointStart(endpoint.parse()?))
425 .await?;
426 }
427 Commands::Register(ref endpoint) => {
428 command_tx
429 .send(Message::EndpointStart(endpoint.parse()?))
430 .await?;
431 }
432 Commands::Unpublish(ref args) => {
433 command_tx
434 .send(Message::EndpointRemove(EndpointRemove {
435 guid: args.guid.clone(),
436 }))
437 .await?;
438 }
439 Commands::Start(ref args) => {
440 command_tx
441 .send(Message::EndpointGuidStart(EndpointStart {
442 guid: args.guid.clone(),
443 }))
444 .await?;
445 }
446 Commands::Stop(ref args) => {
447 command_tx
448 .send(Message::EndpointStop(EndpointStop {
449 guid: args.guid.clone(),
450 }))
451 .await?;
452 }
453 Commands::Ping(ref args) => {
454 pings = args.num.unwrap_or(1);
455 for _i in 0..pings {
456 ping::publish(command_tx.clone()).await?;
457 }
458 }
459 Commands::Login(_) => {
460 write_stdout(crate::t!("client-authorized"));
461 break;
462 }
463 _ => {}
464 }
465 }
466 ConnectState::Disconnected => {
467 if let Some(spinner) = current_spinner.take() {
468 spinner.finish_and_clear();
469 }
470 }
471 },
472
473 Message::Progress(info) => {
474 let progress_guid = if info.guid.is_empty() {
475 "default"
476 } else {
477 &info.guid
478 };
479
480 if info.current == 0 {
481 let bar = multi_progress.add(ProgressBar::new(info.total as u64));
483 bar.set_message(info.message.clone());
484 bar.set_style(ProgressStyle::default_bar().template(&info.template)?);
485 progress_bars.insert(progress_guid.to_string(), bar);
486 } else if info.current >= info.total {
487 if let Some(progress_bar) = progress_bars.remove(progress_guid) {
489 progress_bar.finish_and_clear();
490 }
491 } else if let Some(bar) = progress_bars.get(progress_guid) {
492 bar.set_position(info.current as u64);
494 if !info.message.is_empty() {
495 bar.set_message(info.message.clone());
496 }
497 }
498 }
499
500 Message::EndpointListAck(list) => {
501 if list.endpoints.is_empty() {
502 write_stdout(crate::t!("no-registered-services"));
503 } else {
504 let mut output = String::new();
505 let use_colors = io::stderr().is_terminal();
506
507 for ep in &list.endpoints {
508 let status = ep.status.as_deref().unwrap_or("unknown");
509 let colored_status = if use_colors {
510 match status {
511 "online" => format!("\x1b[32m{}\x1b[0m ", status), "offline" => format!("\x1b[31m{}\x1b[0m", status), "starting" => format!("\x1b[33m{}\x1b[0m", status), "stopping" => format!("\x1b[33m{}\x1b[0m", status), "error" => format!("\x1b[31m{}\x1b[0m ", status), _ => status.to_string(),
517 }
518 } else {
519 status.to_string()
520 };
521
522 output.push_str(&format!("{} {} {}\n", colored_status, ep.guid, ep));
523 }
524 write_stdout(output);
525 }
526 if !matches!(cli.command, Commands::Run { .. }) {
527 break;
528 }
529 }
530
531 Message::EndpointClearAck(_) => {
532 write_stdout(crate::t!("all-services-removed"));
533 if !matches!(cli.command, Commands::Run { .. }) {
534 break;
535 }
536 }
537
538 other => {
539 debug!("Unhandled message: {:?}", other);
540 }
541 }
542 }
543
544 command_tx.send(Message::Stop(Stop {})).await.ok();
545
546 Ok(())
547}