cloudpub_client/
base.rs

1use crate::client::run_client;
2use crate::commands::{Commands, ServiceAction};
3pub use crate::config::{ClientConfig, ClientOpts};
4use crate::ping;
5use crate::service::{create_service_manager, ServiceConfig, ServiceStatus};
6use crate::shell::get_cache_dir;
7use anyhow::{bail, Context, Result};
8use clap::Parser;
9use cloudpub_common::logging::{init_log, WorkerGuard};
10use cloudpub_common::protocol::message::Message;
11use cloudpub_common::protocol::{
12    ConnectState, EndpointClear, EndpointList, EndpointRemove, EndpointStart, EndpointStartAll,
13    EndpointStop, ErrorKind, PerformUpgrade, Stop,
14};
15use cloudpub_common::{LONG_VERSION, VERSION};
16use dirs::cache_dir;
17use futures::future::FutureExt;
18use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
19use parking_lot::RwLock;
20use std::collections::HashMap;
21use std::env;
22use std::io::{self, IsTerminal, Write};
23use std::sync::Arc;
24use tokio::sync::mpsc;
25use tracing::{debug, error, warn};
26
27const CONFIG_FILE: &str = "client.toml";
28
29#[derive(Parser, Debug)]
30#[command(about, version(VERSION), long_version(LONG_VERSION))]
31pub struct Cli {
32    #[clap(subcommand)]
33    pub command: Commands,
34    #[clap(short, long, default_value = "debug", help = "Log level")]
35    pub log_level: String,
36    #[clap(short, long, default_value = "false", help = "Ouput log to console")]
37    pub verbose: bool,
38    #[clap(short, long, help = "Path to the config file")]
39    pub conf: Option<String>,
40    #[clap(short, long, default_value = "false", help = "Read-only config mode")]
41    pub readonly: bool,
42}
43
44fn handle_service_command(action: &ServiceAction, config: &ClientConfig) -> Result<()> {
45    // Get the current executable path
46    let exe_path = env::current_exe().context("Failed to get current executable path")?;
47
48    // Prepare config file argument
49    let mut args = Vec::new();
50    if let Some(path_str) = config.get_config_path().to_str() {
51        args.push("--conf".to_string());
52        args.push(path_str.to_string());
53    }
54
55    // Add the run command for the service
56    args.push("run".to_string());
57
58    // On Windows, add the service flag
59    #[cfg(target_os = "windows")]
60    {
61        args.push("--run-as-service".to_string());
62    }
63
64    // Create service configuration
65    let service_config = ServiceConfig {
66        #[cfg(target_os = "macos")]
67        name: "ru.cloudpub.clo".to_string(),
68        #[cfg(not(target_os = "macos"))]
69        name: "cloudpub".to_string(),
70        display_name: "CloudPub Client".to_string(),
71        description: "CloudPub Client Service".to_string(),
72        executable_path: exe_path,
73        args,
74        config_path: Some(config.get_config_path().to_owned()),
75    };
76
77    // Create the appropriate service manager for the current platform
78    let service_manager = create_service_manager(service_config);
79
80    match action {
81        ServiceAction::Install => {
82            service_manager.install()?;
83            println!("{}", crate::t!("service-installed"));
84        }
85        ServiceAction::Uninstall => {
86            service_manager.uninstall()?;
87            println!("{}", crate::t!("service-uninstalled"));
88        }
89        ServiceAction::Start => {
90            service_manager.start()?;
91            println!("{}", crate::t!("service-started"));
92        }
93        ServiceAction::Stop => {
94            service_manager.stop()?;
95            println!("{}", crate::t!("service-stopped-service"));
96        }
97        ServiceAction::Status => {
98            let status = service_manager.status()?;
99            match status {
100                ServiceStatus::Running => println!("{}", crate::t!("service-running")),
101                ServiceStatus::Stopped => println!("{}", crate::t!("service-stopped-status")),
102                ServiceStatus::NotInstalled => println!("{}", crate::t!("service-not-installed")),
103                ServiceStatus::Unknown => println!("{}", crate::t!("service-status-unknown")),
104            }
105        }
106    }
107
108    Ok(())
109}
110
111pub fn init(args: &Cli) -> Result<(WorkerGuard, Arc<RwLock<ClientConfig>>)> {
112    // Raise `nofile` limit on linux and mac
113    if let Err(err) = fdlimit::raise_fd_limit() {
114        warn!("Failed to raise file descriptor limit: {}", err);
115    }
116
117    // Create log directory
118    let log_dir = cache_dir().context("Can't get cache dir")?.join("cloudpub");
119    std::fs::create_dir_all(&log_dir).context("Can't create log dir")?;
120
121    let log_file = log_dir.join("client.log");
122
123    let guard = init_log(
124        &args.log_level,
125        &log_file,
126        args.verbose,
127        10 * 1024 * 1024,
128        2,
129    )
130    .context("Failed to initialize logging")?;
131
132    let config = if let Some(path) = args.conf.as_ref() {
133        ClientConfig::from_file(&path.into(), args.readonly)?
134    } else {
135        ClientConfig::load(CONFIG_FILE, true, args.readonly)?
136    };
137    let config = Arc::new(RwLock::new(config));
138    Ok((guard, config))
139}
140
141#[tokio::main]
142pub async fn cli_main(cli: Cli, config: Arc<RwLock<ClientConfig>>) -> Result<()> {
143    ctrlc::set_handler(move || {
144        std::process::exit(1);
145    })
146    .context("Error setting Ctrl-C handler")?;
147
148    let (command_tx, command_rx) = mpsc::channel(1024);
149    main_loop(cli, config, command_tx, command_rx).await
150}
151
152fn make_spinner(msg: String) -> ProgressBar {
153    let spinner = ProgressBar::new_spinner();
154    let style = ProgressStyle::default_spinner()
155        .template("{spinner} {msg}")
156        .unwrap();
157    #[cfg(target_os = "windows")]
158    let style = style.tick_chars("-\\|/ ");
159    spinner.set_style(style);
160    spinner.set_message(msg);
161    #[cfg(unix)]
162    spinner.enable_steady_tick(std::time::Duration::from_millis(100));
163    spinner
164}
165
166pub async fn main_loop(
167    mut cli: Cli,
168    config: Arc<RwLock<ClientConfig>>,
169    command_tx: mpsc::Sender<Message>,
170    command_rx: mpsc::Receiver<Message>,
171) -> Result<()> {
172    let mut opts = ClientOpts::default();
173    let write_stdout = |res: String| {
174        println!("{}", res);
175    };
176
177    let write_stderr = |res: String| {
178        eprintln!("{}", res);
179    };
180
181    let (result_tx, mut result_rx) = mpsc::channel(1024);
182
183    let mut pings = 1;
184
185    opts.secondary = match &mut cli.command {
186        Commands::Set(set_args) => {
187            config.write().set(&set_args.key, &set_args.value)?;
188            return Ok(());
189        }
190        Commands::Get(get_args) => {
191            let value = config.read().get(&get_args.key)?;
192            write_stdout(value);
193            return Ok(());
194        }
195        Commands::Options => {
196            let options = config.read().get_all_options();
197            let mut output = String::new();
198            for (key, value) in options {
199                output.push_str(&format!("{}: {}\n", key, value));
200            }
201            write_stdout(output.trim_end().to_string());
202            return Ok(());
203        }
204        Commands::Purge => {
205            let cache_dir = get_cache_dir("")?;
206            debug!(
207                "{}",
208                crate::t!("purge-cache-dir", "path" => cache_dir.to_str().unwrap())
209            );
210            std::fs::remove_dir_all(&cache_dir).ok();
211            return Ok(());
212        }
213        Commands::Login(args) => {
214            let email = match &args.email {
215                Some(email) => email.clone(),
216                None => {
217                    // Prompt the user for email
218                    print!("{}", crate::t!("enter-email"));
219                    std::io::stdout().flush().ok();
220                    let mut email = String::new();
221                    std::io::stdin().read_line(&mut email)?;
222                    email.trim().to_string()
223                }
224            };
225
226            let password = match &args.password {
227                Some(pwd) => pwd.clone(),
228                None => {
229                    // Try to read from environment first
230                    if let Ok(pwd) = std::env::var("PASSWORD") {
231                        pwd
232                    } else {
233                        // If not in environment, prompt the user
234                        print!("{}", crate::t!("enter-password"));
235                        std::io::stdout().flush().ok();
236                        rpassword::read_password().unwrap_or_default()
237                    }
238                }
239            };
240            opts.credentials = Some((email, password));
241            true
242        }
243        Commands::Logout => {
244            config.write().token = None;
245            config
246                .write()
247                .save()
248                .context("Failed to save config after logout")?;
249            write_stderr(crate::t!("session-terminated"));
250            return Ok(());
251        }
252        Commands::Register(publish_args) => {
253            config.read().validate()?;
254            publish_args.parse()?;
255            true
256        }
257        Commands::Publish(publish_args) => {
258            config.read().validate()?;
259            publish_args.parse()?;
260            false
261        }
262
263        Commands::Ping(_) => {
264            opts.transient = true;
265            true
266        }
267
268        Commands::Run => false,
269
270        Commands::Unpublish(_)
271        | Commands::Start(_)
272        | Commands::Stop(_)
273        | Commands::Break
274        | Commands::Ls
275        | Commands::Clean
276        | Commands::Upgrade => {
277            config.read().validate()?;
278            true
279        }
280        Commands::Service { action } => {
281            return handle_service_command(action, &config.read());
282        }
283    };
284
285    debug!("Config: {:?}", config);
286
287    tokio::spawn(async move {
288        if let Err(err) = run_client(config.clone(), opts, command_rx, result_tx)
289            .boxed()
290            .await
291        {
292            error!("Error running client: {:?}", err);
293        }
294    });
295
296    let mut current_spinner = None;
297    let multi_progress = MultiProgress::new();
298    let mut progress_bars: HashMap<String, ProgressBar> = HashMap::new();
299
300    loop {
301        match result_rx
302            .recv()
303            .await
304            .context("Failed to receive message")?
305        {
306            Message::Error(err) => {
307                let kind: ErrorKind = err.kind.try_into().unwrap_or(ErrorKind::Fatal);
308                if kind == ErrorKind::Fatal || kind == ErrorKind::AuthFailed {
309                    command_tx.send(Message::Stop(Stop {})).await.ok();
310                    bail!("{}", err.message);
311                } else {
312                    write_stderr(err.message.to_string());
313                }
314            }
315
316            Message::UpgradeAvailable(info) => match cli.command {
317                Commands::Upgrade => {
318                    command_tx
319                        .send(Message::PerformUpgrade(PerformUpgrade {
320                            version: info.version.clone(),
321                        }))
322                        .await
323                        .context("Failed to send upgrade message")?;
324                }
325                Commands::Run | Commands::Publish(_) => {
326                    write_stderr(crate::t!("upgrade-available", "version" => info.version.clone()));
327                }
328                _ => {}
329            },
330
331            Message::EndpointAck(endpoint) => {
332                if endpoint.status == Some("online".to_string()) {
333                    match cli.command {
334                        Commands::Ping(ref args) => {
335                            current_spinner = Some(make_spinner(crate::t!("measuring-speed")));
336                            let stats = ping::ping_test(endpoint, args.bare).await?;
337                            current_spinner.take();
338                            if args.bare {
339                                write_stdout(stats.to_string());
340                            } else {
341                                write_stdout(stats);
342                            }
343                            pings -= 1;
344                            if pings == 0 {
345                                break;
346                            }
347                        }
348                        Commands::Register(_) => {
349                            write_stdout(
350                                crate::t!("service-registered", "endpoint" => endpoint.to_string()),
351                            );
352                            break;
353                        }
354                        Commands::Publish(_) | Commands::Run => {
355                            if endpoint.error.is_empty() {
356                                write_stdout(
357                                    crate::t!("service-published", "endpoint" => endpoint.to_string()),
358                                )
359                            } else {
360                                write_stdout(
361                                    crate::t!("service-error", "endpoint" => endpoint.to_string()),
362                                )
363                            }
364                        }
365                        _ => {}
366                    }
367                }
368            }
369
370            Message::EndpointStopAck(ep) => {
371                write_stdout(crate::t!("service-stopped", "guid" => ep.guid));
372                if matches!(cli.command, Commands::Unpublish(_)) {
373                    break;
374                }
375            }
376
377            Message::EndpointRemoveAck(ep) => {
378                write_stdout(crate::t!("service-removed", "guid" => ep.guid));
379                if matches!(cli.command, Commands::Unpublish(_)) {
380                    break;
381                }
382            }
383
384            Message::ConnectState(st) => match st.try_into().unwrap_or(ConnectState::Connecting) {
385                ConnectState::Connecting => {
386                    current_spinner = Some(make_spinner(crate::t!("connecting")));
387                }
388
389                ConnectState::Connected => {
390                    if let Some(spinner) = current_spinner.take() {
391                        spinner.finish_and_clear();
392                    }
393
394                    match cli.command {
395                        Commands::Ls => {
396                            command_tx
397                                .send(Message::EndpointList(EndpointList {}))
398                                .await?;
399                        }
400                        Commands::Clean => {
401                            command_tx
402                                .send(Message::EndpointClear(EndpointClear {}))
403                                .await?;
404                        }
405                        Commands::Run => {
406                            command_tx
407                                .send(Message::EndpointStartAll(EndpointStartAll {}))
408                                .await?;
409                        }
410                        Commands::Publish(ref endpoint) => {
411                            command_tx
412                                .send(Message::EndpointStart(endpoint.parse()?))
413                                .await?;
414                        }
415                        Commands::Register(ref endpoint) => {
416                            command_tx
417                                .send(Message::EndpointStart(endpoint.parse()?))
418                                .await?;
419                        }
420                        Commands::Unpublish(ref args) => {
421                            command_tx
422                                .send(Message::EndpointRemove(EndpointRemove {
423                                    guid: args.guid.clone(),
424                                }))
425                                .await?;
426                        }
427                        Commands::Start(ref args) => {
428                            command_tx
429                                .send(Message::EndpointGuidStart(EndpointStart {
430                                    guid: args.guid.clone(),
431                                }))
432                                .await?;
433                        }
434                        Commands::Stop(ref args) => {
435                            command_tx
436                                .send(Message::EndpointStop(EndpointStop {
437                                    guid: args.guid.clone(),
438                                }))
439                                .await?;
440                        }
441                        Commands::Ping(ref args) => {
442                            pings = args.num.unwrap_or(1);
443                            for _i in 0..pings {
444                                ping::publish(command_tx.clone()).await?;
445                            }
446                        }
447                        Commands::Login(_) => {
448                            write_stdout(crate::t!("client-authorized"));
449                            break;
450                        }
451                        _ => {}
452                    }
453                }
454                ConnectState::Disconnected => {
455                    if let Some(spinner) = current_spinner.take() {
456                        spinner.finish_and_clear();
457                    }
458                }
459            },
460
461            Message::Progress(info) => {
462                let progress_guid = if info.guid.is_empty() {
463                    "default"
464                } else {
465                    &info.guid
466                };
467
468                if info.current == 0 {
469                    // Create a new progress bar for this GUID
470                    let bar = multi_progress.add(ProgressBar::new(info.total as u64));
471                    bar.set_message(info.message.clone());
472                    bar.set_style(ProgressStyle::default_bar().template(&info.template)?);
473                    progress_bars.insert(progress_guid.to_string(), bar);
474                } else if info.current >= info.total {
475                    // Progress completed, remove and finish the progress bar
476                    if let Some(progress_bar) = progress_bars.remove(progress_guid) {
477                        progress_bar.finish_and_clear();
478                    }
479                } else if let Some(bar) = progress_bars.get(progress_guid) {
480                    // Update existing progress bar
481                    bar.set_position(info.current as u64);
482                    if !info.message.is_empty() {
483                        bar.set_message(info.message.clone());
484                    }
485                }
486            }
487
488            Message::EndpointListAck(list) => {
489                if list.endpoints.is_empty() {
490                    write_stdout(crate::t!("no-registered-services"));
491                } else {
492                    let mut output = String::new();
493                    let use_colors = io::stderr().is_terminal();
494
495                    for ep in &list.endpoints {
496                        let status = ep.status.as_deref().unwrap_or("unknown");
497                        let colored_status = if use_colors {
498                            match status {
499                                "online" => format!("\x1b[32m{}\x1b[0m ", status), // Green
500                                "offline" => format!("\x1b[31m{}\x1b[0m", status), // Red
501                                "starting" => format!("\x1b[33m{}\x1b[0m", status), // Yellow
502                                "stopping" => format!("\x1b[33m{}\x1b[0m", status), // Yellow
503                                "error" => format!("\x1b[31m{}\x1b[0m   ", status), // Red
504                                _ => status.to_string(),
505                            }
506                        } else {
507                            status.to_string()
508                        };
509
510                        output.push_str(&format!("{} {} {}\n", colored_status, ep.guid, ep));
511                    }
512                    write_stdout(output);
513                }
514                if !matches!(cli.command, Commands::Run) {
515                    break;
516                }
517            }
518
519            Message::EndpointClearAck(_) => {
520                write_stdout(crate::t!("all-services-removed"));
521                if !matches!(cli.command, Commands::Run) {
522                    break;
523                }
524            }
525
526            other => {
527                debug!("Unhandled message: {:?}", other);
528            }
529        }
530    }
531
532    command_tx.send(Message::Stop(Stop {})).await.ok();
533
534    Ok(())
535}