1use crate::client::run_client;
2use crate::commands::{Commands, ServiceAction};
3pub use crate::config::{ClientConfig, ClientOpts};
4use crate::ping;
5use crate::service::{create_service_manager, ServiceConfig, ServiceStatus};
6use crate::shell::get_cache_dir;
7use anyhow::{bail, Context, Result};
8use clap::Parser;
9use cloudpub_common::logging::{init_log, WorkerGuard};
10use cloudpub_common::protocol::message::Message;
11use cloudpub_common::protocol::{
12 ConnectState, EndpointClear, EndpointList, EndpointRemove, EndpointStart, EndpointStartAll,
13 EndpointStop, ErrorKind, PerformUpgrade, Stop,
14};
15use cloudpub_common::{LONG_VERSION, VERSION};
16use dirs::cache_dir;
17use futures::future::FutureExt;
18use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
19use parking_lot::RwLock;
20use std::collections::HashMap;
21use std::env;
22use std::io::{self, IsTerminal, Write};
23use std::sync::Arc;
24use tokio::sync::mpsc;
25use tracing::{debug, error, warn};
26
27const CONFIG_FILE: &str = "client.toml";
28
29#[derive(Parser, Debug)]
30#[command(about, version(VERSION), long_version(LONG_VERSION))]
31pub struct Cli {
32 #[clap(subcommand)]
33 pub command: Commands,
34 #[clap(short, long, default_value = "debug", help = "Log level")]
35 pub log_level: String,
36 #[clap(short, long, default_value = "false", help = "Ouput log to console")]
37 pub verbose: bool,
38 #[clap(short, long, help = "Path to the config file")]
39 pub conf: Option<String>,
40 #[clap(short, long, default_value = "false", help = "Read-only config mode")]
41 pub readonly: bool,
42}
43
44fn handle_service_command(action: &ServiceAction, config: &ClientConfig) -> Result<()> {
45 let exe_path = env::current_exe().context("Failed to get current executable path")?;
47
48 let mut args = Vec::new();
50 if let Some(path_str) = config.get_config_path().to_str() {
51 args.push("--conf".to_string());
52 args.push(path_str.to_string());
53 }
54
55 args.push("run".to_string());
57
58 #[cfg(target_os = "windows")]
60 {
61 args.push("--run-as-service".to_string());
62 }
63
64 let service_config = ServiceConfig {
66 #[cfg(target_os = "macos")]
67 name: "ru.cloudpub.clo".to_string(),
68 #[cfg(not(target_os = "macos"))]
69 name: "cloudpub".to_string(),
70 display_name: "CloudPub Client".to_string(),
71 description: "CloudPub Client Service".to_string(),
72 executable_path: exe_path,
73 args,
74 config_path: Some(config.get_config_path().to_owned()),
75 };
76
77 let service_manager = create_service_manager(service_config);
79
80 match action {
81 ServiceAction::Install => {
82 service_manager.install()?;
83 println!("{}", crate::t!("service-installed"));
84 }
85 ServiceAction::Uninstall => {
86 service_manager.uninstall()?;
87 println!("{}", crate::t!("service-uninstalled"));
88 }
89 ServiceAction::Start => {
90 service_manager.start()?;
91 println!("{}", crate::t!("service-started"));
92 }
93 ServiceAction::Stop => {
94 service_manager.stop()?;
95 println!("{}", crate::t!("service-stopped-service"));
96 }
97 ServiceAction::Status => {
98 let status = service_manager.status()?;
99 match status {
100 ServiceStatus::Running => println!("{}", crate::t!("service-running")),
101 ServiceStatus::Stopped => println!("{}", crate::t!("service-stopped-status")),
102 ServiceStatus::NotInstalled => println!("{}", crate::t!("service-not-installed")),
103 ServiceStatus::Unknown => println!("{}", crate::t!("service-status-unknown")),
104 }
105 }
106 }
107
108 Ok(())
109}
110
111pub fn init(args: &Cli) -> Result<(WorkerGuard, Arc<RwLock<ClientConfig>>)> {
112 if let Err(err) = fdlimit::raise_fd_limit() {
114 warn!("Failed to raise file descriptor limit: {}", err);
115 }
116
117 let log_dir = cache_dir().context("Can't get cache dir")?.join("cloudpub");
119 std::fs::create_dir_all(&log_dir).context("Can't create log dir")?;
120
121 let log_file = log_dir.join("client.log");
122
123 let guard = init_log(
124 &args.log_level,
125 &log_file,
126 args.verbose,
127 10 * 1024 * 1024,
128 2,
129 )
130 .context("Failed to initialize logging")?;
131
132 let config = if let Some(path) = args.conf.as_ref() {
133 ClientConfig::from_file(&path.into(), args.readonly)?
134 } else {
135 ClientConfig::load(CONFIG_FILE, true, args.readonly)?
136 };
137 let config = Arc::new(RwLock::new(config));
138 Ok((guard, config))
139}
140
141#[tokio::main]
142pub async fn cli_main(cli: Cli, config: Arc<RwLock<ClientConfig>>) -> Result<()> {
143 ctrlc::set_handler(move || {
144 std::process::exit(1);
145 })
146 .context("Error setting Ctrl-C handler")?;
147
148 let (command_tx, command_rx) = mpsc::channel(1024);
149 main_loop(cli, config, command_tx, command_rx).await
150}
151
152fn make_spinner(msg: String) -> ProgressBar {
153 let spinner = ProgressBar::new_spinner();
154 let style = ProgressStyle::default_spinner()
155 .template("{spinner} {msg}")
156 .unwrap();
157 #[cfg(target_os = "windows")]
158 let style = style.tick_chars("-\\|/ ");
159 spinner.set_style(style);
160 spinner.set_message(msg);
161 #[cfg(unix)]
162 spinner.enable_steady_tick(std::time::Duration::from_millis(100));
163 spinner
164}
165
166pub async fn main_loop(
167 mut cli: Cli,
168 config: Arc<RwLock<ClientConfig>>,
169 command_tx: mpsc::Sender<Message>,
170 command_rx: mpsc::Receiver<Message>,
171) -> Result<()> {
172 let mut opts = ClientOpts::default();
173 let write_stdout = |res: String| {
174 println!("{}", res);
175 };
176
177 let write_stderr = |res: String| {
178 eprintln!("{}", res);
179 };
180
181 let (result_tx, mut result_rx) = mpsc::channel(1024);
182
183 let mut pings = 1;
184
185 opts.secondary = match &mut cli.command {
186 Commands::Set(set_args) => {
187 config.write().set(&set_args.key, &set_args.value)?;
188 return Ok(());
189 }
190 Commands::Get(get_args) => {
191 let value = config.read().get(&get_args.key)?;
192 write_stdout(value);
193 return Ok(());
194 }
195 Commands::Options => {
196 let options = config.read().get_all_options();
197 let mut output = String::new();
198 for (key, value) in options {
199 output.push_str(&format!("{}: {}\n", key, value));
200 }
201 write_stdout(output.trim_end().to_string());
202 return Ok(());
203 }
204 Commands::Purge => {
205 let cache_dir = get_cache_dir("")?;
206 debug!(
207 "{}",
208 crate::t!("purge-cache-dir", "path" => cache_dir.to_str().unwrap())
209 );
210 std::fs::remove_dir_all(&cache_dir).ok();
211 return Ok(());
212 }
213 Commands::Login(args) => {
214 let email = match &args.email {
215 Some(email) => email.clone(),
216 None => {
217 print!("{}", crate::t!("enter-email"));
219 std::io::stdout().flush().ok();
220 let mut email = String::new();
221 std::io::stdin().read_line(&mut email)?;
222 email.trim().to_string()
223 }
224 };
225
226 let password = match &args.password {
227 Some(pwd) => pwd.clone(),
228 None => {
229 if let Ok(pwd) = std::env::var("PASSWORD") {
231 pwd
232 } else {
233 print!("{}", crate::t!("enter-password"));
235 std::io::stdout().flush().ok();
236 rpassword::read_password().unwrap_or_default()
237 }
238 }
239 };
240 opts.credentials = Some((email, password));
241 true
242 }
243 Commands::Logout => {
244 config.write().token = None;
245 config
246 .write()
247 .save()
248 .context("Failed to save config after logout")?;
249 write_stderr(crate::t!("session-terminated"));
250 return Ok(());
251 }
252 Commands::Register(publish_args) => {
253 config.read().validate()?;
254 publish_args.parse()?;
255 true
256 }
257 Commands::Publish(publish_args) => {
258 config.read().validate()?;
259 publish_args.parse()?;
260 false
261 }
262
263 Commands::Ping(_) => {
264 opts.transient = true;
265 true
266 }
267
268 Commands::Run => false,
269
270 Commands::Unpublish(_)
271 | Commands::Start(_)
272 | Commands::Stop(_)
273 | Commands::Break
274 | Commands::Ls
275 | Commands::Clean
276 | Commands::Upgrade => {
277 config.read().validate()?;
278 true
279 }
280 Commands::Service { action } => {
281 return handle_service_command(action, &config.read());
282 }
283 };
284
285 debug!("Config: {:?}", config);
286
287 tokio::spawn(async move {
288 if let Err(err) = run_client(config.clone(), opts, command_rx, result_tx)
289 .boxed()
290 .await
291 {
292 error!("Error running client: {:?}", err);
293 }
294 });
295
296 let mut current_spinner = None;
297 let multi_progress = MultiProgress::new();
298 let mut progress_bars: HashMap<String, ProgressBar> = HashMap::new();
299
300 loop {
301 match result_rx
302 .recv()
303 .await
304 .context("Failed to receive message")?
305 {
306 Message::Error(err) => {
307 let kind: ErrorKind = err.kind.try_into().unwrap_or(ErrorKind::Fatal);
308 if kind == ErrorKind::Fatal || kind == ErrorKind::AuthFailed {
309 command_tx.send(Message::Stop(Stop {})).await.ok();
310 bail!("{}", err.message);
311 } else {
312 write_stderr(err.message.to_string());
313 }
314 }
315
316 Message::UpgradeAvailable(info) => match cli.command {
317 Commands::Upgrade => {
318 command_tx
319 .send(Message::PerformUpgrade(PerformUpgrade {
320 version: info.version.clone(),
321 }))
322 .await
323 .context("Failed to send upgrade message")?;
324 }
325 Commands::Run | Commands::Publish(_) => {
326 write_stderr(crate::t!("upgrade-available", "version" => info.version.clone()));
327 }
328 _ => {}
329 },
330
331 Message::EndpointAck(endpoint) => {
332 if endpoint.status == Some("online".to_string()) {
333 match cli.command {
334 Commands::Ping(ref args) => {
335 current_spinner = Some(make_spinner(crate::t!("measuring-speed")));
336 let stats = ping::ping_test(endpoint, args.bare).await?;
337 current_spinner.take();
338 if args.bare {
339 write_stdout(stats.to_string());
340 } else {
341 write_stdout(stats);
342 }
343 pings -= 1;
344 if pings == 0 {
345 break;
346 }
347 }
348 Commands::Register(_) => {
349 write_stdout(
350 crate::t!("service-registered", "endpoint" => endpoint.to_string()),
351 );
352 break;
353 }
354 Commands::Publish(_) | Commands::Run => {
355 if endpoint.error.is_empty() {
356 write_stdout(
357 crate::t!("service-published", "endpoint" => endpoint.to_string()),
358 )
359 } else {
360 write_stdout(
361 crate::t!("service-error", "endpoint" => endpoint.to_string()),
362 )
363 }
364 }
365 _ => {}
366 }
367 }
368 }
369
370 Message::EndpointStopAck(ep) => {
371 write_stdout(crate::t!("service-stopped", "guid" => ep.guid));
372 if matches!(cli.command, Commands::Unpublish(_)) {
373 break;
374 }
375 }
376
377 Message::EndpointRemoveAck(ep) => {
378 write_stdout(crate::t!("service-removed", "guid" => ep.guid));
379 if matches!(cli.command, Commands::Unpublish(_)) {
380 break;
381 }
382 }
383
384 Message::ConnectState(st) => match st.try_into().unwrap_or(ConnectState::Connecting) {
385 ConnectState::Connecting => {
386 current_spinner = Some(make_spinner(crate::t!("connecting")));
387 }
388
389 ConnectState::Connected => {
390 if let Some(spinner) = current_spinner.take() {
391 spinner.finish_and_clear();
392 }
393
394 match cli.command {
395 Commands::Ls => {
396 command_tx
397 .send(Message::EndpointList(EndpointList {}))
398 .await?;
399 }
400 Commands::Clean => {
401 command_tx
402 .send(Message::EndpointClear(EndpointClear {}))
403 .await?;
404 }
405 Commands::Run => {
406 command_tx
407 .send(Message::EndpointStartAll(EndpointStartAll {}))
408 .await?;
409 }
410 Commands::Publish(ref endpoint) => {
411 command_tx
412 .send(Message::EndpointStart(endpoint.parse()?))
413 .await?;
414 }
415 Commands::Register(ref endpoint) => {
416 command_tx
417 .send(Message::EndpointStart(endpoint.parse()?))
418 .await?;
419 }
420 Commands::Unpublish(ref args) => {
421 command_tx
422 .send(Message::EndpointRemove(EndpointRemove {
423 guid: args.guid.clone(),
424 }))
425 .await?;
426 }
427 Commands::Start(ref args) => {
428 command_tx
429 .send(Message::EndpointGuidStart(EndpointStart {
430 guid: args.guid.clone(),
431 }))
432 .await?;
433 }
434 Commands::Stop(ref args) => {
435 command_tx
436 .send(Message::EndpointStop(EndpointStop {
437 guid: args.guid.clone(),
438 }))
439 .await?;
440 }
441 Commands::Ping(ref args) => {
442 pings = args.num.unwrap_or(1);
443 for _i in 0..pings {
444 ping::publish(command_tx.clone()).await?;
445 }
446 }
447 Commands::Login(_) => {
448 write_stdout(crate::t!("client-authorized"));
449 break;
450 }
451 _ => {}
452 }
453 }
454 ConnectState::Disconnected => {
455 if let Some(spinner) = current_spinner.take() {
456 spinner.finish_and_clear();
457 }
458 }
459 },
460
461 Message::Progress(info) => {
462 let progress_guid = if info.guid.is_empty() {
463 "default"
464 } else {
465 &info.guid
466 };
467
468 if info.current == 0 {
469 let bar = multi_progress.add(ProgressBar::new(info.total as u64));
471 bar.set_message(info.message.clone());
472 bar.set_style(ProgressStyle::default_bar().template(&info.template)?);
473 progress_bars.insert(progress_guid.to_string(), bar);
474 } else if info.current >= info.total {
475 if let Some(progress_bar) = progress_bars.remove(progress_guid) {
477 progress_bar.finish_and_clear();
478 }
479 } else if let Some(bar) = progress_bars.get(progress_guid) {
480 bar.set_position(info.current as u64);
482 if !info.message.is_empty() {
483 bar.set_message(info.message.clone());
484 }
485 }
486 }
487
488 Message::EndpointListAck(list) => {
489 if list.endpoints.is_empty() {
490 write_stdout(crate::t!("no-registered-services"));
491 } else {
492 let mut output = String::new();
493 let use_colors = io::stderr().is_terminal();
494
495 for ep in &list.endpoints {
496 let status = ep.status.as_deref().unwrap_or("unknown");
497 let colored_status = if use_colors {
498 match status {
499 "online" => format!("\x1b[32m{}\x1b[0m ", status), "offline" => format!("\x1b[31m{}\x1b[0m", status), "starting" => format!("\x1b[33m{}\x1b[0m", status), "stopping" => format!("\x1b[33m{}\x1b[0m", status), "error" => format!("\x1b[31m{}\x1b[0m ", status), _ => status.to_string(),
505 }
506 } else {
507 status.to_string()
508 };
509
510 output.push_str(&format!("{} {} {}\n", colored_status, ep.guid, ep));
511 }
512 write_stdout(output);
513 }
514 if !matches!(cli.command, Commands::Run) {
515 break;
516 }
517 }
518
519 Message::EndpointClearAck(_) => {
520 write_stdout(crate::t!("all-services-removed"));
521 if !matches!(cli.command, Commands::Run) {
522 break;
523 }
524 }
525
526 other => {
527 debug!("Unhandled message: {:?}", other);
528 }
529 }
530 }
531
532 command_tx.send(Message::Stop(Stop {})).await.ok();
533
534 Ok(())
535}