cloudpub_client/
base.rs

1use crate::client::run_client;
2use crate::commands::{Commands, ServiceAction};
3pub use crate::config::{ClientConfig, ClientOpts};
4use crate::ping;
5use crate::service::{create_service_manager, ServiceStatus};
6use crate::shell::get_cache_dir;
7use anyhow::{bail, Context, Result};
8use clap::Parser;
9use cloudpub_common::logging::{init_log, WorkerGuard};
10use cloudpub_common::protocol::message::Message;
11use cloudpub_common::protocol::{
12    ConnectState, EndpointClear, EndpointList, EndpointRemove, EndpointStart, EndpointStartAll,
13    EndpointStop, ErrorKind, PerformUpgrade, Stop,
14};
15use cloudpub_common::{LONG_VERSION, VERSION};
16use dirs::cache_dir;
17use futures::future::FutureExt;
18use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
19use parking_lot::RwLock;
20use std::collections::HashMap;
21use std::io::{self, IsTerminal, Write};
22use std::path::PathBuf;
23use std::sync::Arc;
24use tokio::sync::mpsc;
25use tracing::{debug, error, warn};
26
27const CONFIG_FILE: &str = "client.toml";
28
29#[derive(Parser, Debug)]
30#[command(about, version(VERSION), long_version(LONG_VERSION))]
31pub struct Cli {
32    #[clap(subcommand)]
33    pub command: Commands,
34    #[clap(short, long, default_value = "debug", help = "Log level")]
35    pub log_level: String,
36    #[clap(short, long, default_value = "false", help = "Ouput log to console")]
37    pub verbose: bool,
38    #[clap(short, long, help = "Path to the config file")]
39    pub conf: Option<String>,
40    #[clap(short, long, default_value = "false", help = "Read-only config mode")]
41    pub readonly: bool,
42}
43
44fn handle_service_command(action: &ServiceAction, config: &ClientConfig) -> Result<()> {
45    // Determine config path to use for service
46    let config_path = match action {
47        ServiceAction::Install { conf: Some(path) } => {
48            // Use provided config path
49            PathBuf::from(path)
50        }
51        _ => {
52            // Use default config path
53            config.get_config_path().to_owned()
54        }
55    };
56
57    // Check if the provided config path exists
58    let actual_config = if config.token.is_some() {
59        debug!("Service config has token: {:?}", config_path);
60        config_path.clone()
61    } else {
62        #[cfg(any(target_os = "linux", target_os = "macos"))]
63        {
64            use crate::service::ServiceConfig;
65            // On Unix, check if running under sudo and use original user's config if available
66            if let Some(sudo_config) = ServiceConfig::get_sudo_user_config_path() {
67                sudo_config.clone()
68            } else {
69                config_path.clone()
70            }
71        }
72        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
73        config_path.clone()
74    };
75
76    if actual_config.as_os_str().is_empty() || !actual_config.exists() {
77        bail!(crate::t!("error-config-not-found", "path" => format!("{:?}", config_path)));
78    }
79
80    let cfg = ClientConfig::from_file(&actual_config, true)
81        .context(crate::t!("error-config-load-failed", "path" => format!("{:?}", actual_config)))?;
82
83    if cfg.token.is_none() {
84        bail!(crate::t!("error-config-token-missing"));
85    }
86    // Create the appropriate service manager for the current platform
87    let service_manager = create_service_manager(actual_config);
88
89    match action {
90        ServiceAction::Install { .. } => {
91            service_manager.install()?;
92            println!("{}", crate::t!("service-installed"));
93        }
94        ServiceAction::Uninstall => {
95            service_manager.uninstall()?;
96            println!("{}", crate::t!("service-uninstalled"));
97        }
98        ServiceAction::Start => {
99            service_manager.start()?;
100            println!("{}", crate::t!("service-started"));
101        }
102        ServiceAction::Stop => {
103            service_manager.stop()?;
104            println!("{}", crate::t!("service-stopped-service"));
105        }
106        ServiceAction::Status => {
107            let status = service_manager.status()?;
108            match status {
109                ServiceStatus::Running => println!("{}", crate::t!("service-running")),
110                ServiceStatus::Stopped => println!("{}", crate::t!("service-stopped-status")),
111                ServiceStatus::NotInstalled => println!("{}", crate::t!("service-not-installed")),
112                ServiceStatus::Unknown => println!("{}", crate::t!("service-status-unknown")),
113            }
114        }
115    }
116
117    Ok(())
118}
119
120pub fn init(args: &Cli) -> Result<(WorkerGuard, Arc<RwLock<ClientConfig>>)> {
121    // Raise `nofile` limit on linux and mac
122    if let Err(err) = fdlimit::raise_fd_limit() {
123        warn!("Failed to raise file descriptor limit: {}", err);
124    }
125
126    // Create log directory
127    let log_dir = cache_dir().context("Can't get cache dir")?.join("cloudpub");
128    std::fs::create_dir_all(&log_dir).context("Can't create log dir")?;
129
130    let log_file = log_dir.join("client.log");
131
132    let guard = init_log(
133        &args.log_level,
134        &log_file,
135        args.verbose,
136        10 * 1024 * 1024,
137        2,
138    )
139    .context("Failed to initialize logging")?;
140
141    let config = if let Some(path) = args.conf.as_ref() {
142        ClientConfig::from_file(&path.into(), args.readonly)?
143    } else {
144        ClientConfig::load(CONFIG_FILE, true, args.readonly)?
145    };
146    let config = Arc::new(RwLock::new(config));
147    Ok((guard, config))
148}
149
150#[tokio::main]
151pub async fn cli_main(cli: Cli, config: Arc<RwLock<ClientConfig>>) -> Result<()> {
152    ctrlc::set_handler(move || {
153        std::process::exit(1);
154    })
155    .context("Error setting Ctrl-C handler")?;
156
157    let (command_tx, command_rx) = mpsc::channel(1024);
158    main_loop(cli, config, command_tx, command_rx).await
159}
160
161fn make_spinner(msg: String) -> ProgressBar {
162    let spinner = ProgressBar::new_spinner();
163    let style = ProgressStyle::default_spinner()
164        .template("{spinner} {msg}")
165        .unwrap();
166    #[cfg(target_os = "windows")]
167    let style = style.tick_chars("-\\|/ ");
168    spinner.set_style(style);
169    spinner.set_message(msg);
170    #[cfg(unix)]
171    spinner.enable_steady_tick(std::time::Duration::from_millis(100));
172    spinner
173}
174
175pub async fn main_loop(
176    mut cli: Cli,
177    config: Arc<RwLock<ClientConfig>>,
178    command_tx: mpsc::Sender<Message>,
179    command_rx: mpsc::Receiver<Message>,
180) -> Result<()> {
181    let mut opts = ClientOpts::default();
182    let write_stdout = |res: String| {
183        println!("{}", res);
184    };
185
186    let write_stderr = |res: String| {
187        eprintln!("{}", res);
188    };
189
190    let (result_tx, mut result_rx) = mpsc::channel(1024);
191
192    let mut pings = 1;
193
194    opts.secondary = match &mut cli.command {
195        Commands::Set(set_args) => {
196            config.write().set(&set_args.key, &set_args.value)?;
197            return Ok(());
198        }
199        Commands::Get(get_args) => {
200            let value = config.read().get(&get_args.key)?;
201            write_stdout(value);
202            return Ok(());
203        }
204        Commands::Options => {
205            let options = config.read().get_all_options();
206            let mut output = String::new();
207            for (key, value) in options {
208                output.push_str(&format!("{}: {}\n", key, value));
209            }
210            write_stdout(output.trim_end().to_string());
211            return Ok(());
212        }
213        Commands::Purge => {
214            let cache_dir = get_cache_dir("")?;
215            debug!(
216                "{}",
217                crate::t!("purge-cache-dir", "path" => cache_dir.to_str().unwrap())
218            );
219            std::fs::remove_dir_all(&cache_dir).ok();
220            return Ok(());
221        }
222        Commands::Login(args) => {
223            let email = match &args.email {
224                Some(email) => email.clone(),
225                None => {
226                    // Prompt the user for email
227                    print!("{}", crate::t!("enter-email"));
228                    std::io::stdout().flush().ok();
229                    let mut email = String::new();
230                    std::io::stdin().read_line(&mut email)?;
231                    email.trim().to_string()
232                }
233            };
234
235            let password = match &args.password {
236                Some(pwd) => pwd.clone(),
237                None => {
238                    // Try to read from environment first
239                    if let Ok(pwd) = std::env::var("PASSWORD") {
240                        pwd
241                    } else {
242                        // If not in environment, prompt the user
243                        print!("{}", crate::t!("enter-password"));
244                        std::io::stdout().flush().ok();
245                        rpassword::read_password().unwrap_or_default()
246                    }
247                }
248            };
249            opts.credentials = Some((email, password));
250            true
251        }
252        Commands::Logout => {
253            config.write().token = None;
254            config
255                .write()
256                .save()
257                .context("Failed to save config after logout")?;
258            write_stderr(crate::t!("session-terminated"));
259            return Ok(());
260        }
261        Commands::Register(publish_args) => {
262            config.read().validate()?;
263            publish_args.parse()?;
264            true
265        }
266        Commands::Publish(publish_args) => {
267            config.read().validate()?;
268            publish_args.parse()?;
269            false
270        }
271
272        Commands::Ping(_) => {
273            opts.transient = true;
274            true
275        }
276
277        Commands::Run { run_as_service } => {
278            opts.is_service = *run_as_service;
279            false
280        }
281
282        Commands::Unpublish(_)
283        | Commands::Start(_)
284        | Commands::Stop(_)
285        | Commands::Break
286        | Commands::Ls
287        | Commands::Clean
288        | Commands::Upgrade => {
289            config.read().validate()?;
290            true
291        }
292        Commands::Service { action } => {
293            return handle_service_command(action, &config.read());
294        }
295    };
296
297    debug!("Config: {:?}", config);
298
299    tokio::spawn(async move {
300        if let Err(err) = run_client(config.clone(), opts, command_rx, result_tx)
301            .boxed()
302            .await
303        {
304            error!("Error running client: {:?}", err);
305        }
306    });
307
308    let mut current_spinner = None;
309    let multi_progress = MultiProgress::new();
310    let mut progress_bars: HashMap<String, ProgressBar> = HashMap::new();
311
312    loop {
313        match result_rx
314            .recv()
315            .await
316            .context("Failed to receive message")?
317        {
318            Message::Error(err) => {
319                let kind: ErrorKind = err.kind.try_into().unwrap_or(ErrorKind::Fatal);
320                if kind == ErrorKind::Fatal || kind == ErrorKind::AuthFailed {
321                    command_tx.send(Message::Stop(Stop {})).await.ok();
322                    bail!("{}", err.message);
323                } else {
324                    write_stderr(err.message.to_string());
325                }
326            }
327
328            Message::UpgradeAvailable(info) => match cli.command {
329                Commands::Upgrade => {
330                    command_tx
331                        .send(Message::PerformUpgrade(PerformUpgrade {
332                            version: info.version.clone(),
333                        }))
334                        .await
335                        .context("Failed to send upgrade message")?;
336                }
337                Commands::Run { .. } | Commands::Publish(_) => {
338                    write_stderr(crate::t!("upgrade-available", "version" => info.version.clone()));
339                }
340                _ => {}
341            },
342
343            Message::EndpointAck(endpoint) => {
344                if endpoint.status == Some("online".to_string()) {
345                    match cli.command {
346                        Commands::Ping(ref args) => {
347                            current_spinner = Some(make_spinner(crate::t!("measuring-speed")));
348                            let stats = ping::ping_test(endpoint, args.bare).await?;
349                            current_spinner.take();
350                            if args.bare {
351                                write_stdout(stats.to_string());
352                            } else {
353                                write_stdout(stats);
354                            }
355                            pings -= 1;
356                            if pings == 0 {
357                                break;
358                            }
359                        }
360                        Commands::Register(_) => {
361                            write_stdout(
362                                crate::t!("service-registered", "endpoint" => endpoint.to_string()),
363                            );
364                            break;
365                        }
366                        Commands::Publish(_) | Commands::Run { .. } => {
367                            if endpoint.error.is_empty() {
368                                write_stdout(
369                                    crate::t!("service-published", "endpoint" => endpoint.to_string()),
370                                )
371                            } else {
372                                write_stdout(
373                                    crate::t!("service-error", "endpoint" => endpoint.to_string()),
374                                )
375                            }
376                        }
377                        _ => {}
378                    }
379                }
380            }
381
382            Message::EndpointStopAck(ep) => {
383                write_stdout(crate::t!("service-stopped", "guid" => ep.guid));
384                if matches!(cli.command, Commands::Unpublish(_)) {
385                    break;
386                }
387            }
388
389            Message::EndpointRemoveAck(ep) => {
390                write_stdout(crate::t!("service-removed", "guid" => ep.guid));
391                if matches!(cli.command, Commands::Unpublish(_)) {
392                    break;
393                }
394            }
395
396            Message::ConnectState(st) => match st.try_into().unwrap_or(ConnectState::Connecting) {
397                ConnectState::Connecting => {
398                    current_spinner = Some(make_spinner(crate::t!("connecting")));
399                }
400
401                ConnectState::Connected => {
402                    if let Some(spinner) = current_spinner.take() {
403                        spinner.finish_and_clear();
404                    }
405
406                    match cli.command {
407                        Commands::Ls => {
408                            command_tx
409                                .send(Message::EndpointList(EndpointList {}))
410                                .await?;
411                        }
412                        Commands::Clean => {
413                            command_tx
414                                .send(Message::EndpointClear(EndpointClear {}))
415                                .await?;
416                        }
417                        Commands::Run { .. } => {
418                            command_tx
419                                .send(Message::EndpointStartAll(EndpointStartAll {}))
420                                .await?;
421                        }
422                        Commands::Publish(ref endpoint) => {
423                            command_tx
424                                .send(Message::EndpointStart(endpoint.parse()?))
425                                .await?;
426                        }
427                        Commands::Register(ref endpoint) => {
428                            command_tx
429                                .send(Message::EndpointStart(endpoint.parse()?))
430                                .await?;
431                        }
432                        Commands::Unpublish(ref args) => {
433                            command_tx
434                                .send(Message::EndpointRemove(EndpointRemove {
435                                    guid: args.guid.clone(),
436                                }))
437                                .await?;
438                        }
439                        Commands::Start(ref args) => {
440                            command_tx
441                                .send(Message::EndpointGuidStart(EndpointStart {
442                                    guid: args.guid.clone(),
443                                }))
444                                .await?;
445                        }
446                        Commands::Stop(ref args) => {
447                            command_tx
448                                .send(Message::EndpointStop(EndpointStop {
449                                    guid: args.guid.clone(),
450                                }))
451                                .await?;
452                        }
453                        Commands::Ping(ref args) => {
454                            pings = args.num.unwrap_or(1);
455                            for _i in 0..pings {
456                                ping::publish(command_tx.clone()).await?;
457                            }
458                        }
459                        Commands::Login(_) => {
460                            write_stdout(crate::t!("client-authorized"));
461                            break;
462                        }
463                        _ => {}
464                    }
465                }
466                ConnectState::Disconnected => {
467                    if let Some(spinner) = current_spinner.take() {
468                        spinner.finish_and_clear();
469                    }
470                }
471            },
472
473            Message::Progress(info) => {
474                let progress_guid = if info.guid.is_empty() {
475                    "default"
476                } else {
477                    &info.guid
478                };
479
480                if info.current == 0 {
481                    // Create a new progress bar for this GUID
482                    let bar = multi_progress.add(ProgressBar::new(info.total as u64));
483                    bar.set_message(info.message.clone());
484                    bar.set_style(ProgressStyle::default_bar().template(&info.template)?);
485                    progress_bars.insert(progress_guid.to_string(), bar);
486                } else if info.current >= info.total {
487                    // Progress completed, remove and finish the progress bar
488                    if let Some(progress_bar) = progress_bars.remove(progress_guid) {
489                        progress_bar.finish_and_clear();
490                    }
491                } else if let Some(bar) = progress_bars.get(progress_guid) {
492                    // Update existing progress bar
493                    bar.set_position(info.current as u64);
494                    if !info.message.is_empty() {
495                        bar.set_message(info.message.clone());
496                    }
497                }
498            }
499
500            Message::EndpointListAck(list) => {
501                if list.endpoints.is_empty() {
502                    write_stdout(crate::t!("no-registered-services"));
503                } else {
504                    let mut output = String::new();
505                    let use_colors = io::stderr().is_terminal();
506
507                    for ep in &list.endpoints {
508                        let status = ep.status.as_deref().unwrap_or("unknown");
509                        let colored_status = if use_colors {
510                            match status {
511                                "online" => format!("\x1b[32m{}\x1b[0m ", status), // Green
512                                "offline" => format!("\x1b[31m{}\x1b[0m", status), // Red
513                                "starting" => format!("\x1b[33m{}\x1b[0m", status), // Yellow
514                                "stopping" => format!("\x1b[33m{}\x1b[0m", status), // Yellow
515                                "error" => format!("\x1b[31m{}\x1b[0m   ", status), // Red
516                                _ => status.to_string(),
517                            }
518                        } else {
519                            status.to_string()
520                        };
521
522                        output.push_str(&format!("{} {} {}\n", colored_status, ep.guid, ep));
523                    }
524                    write_stdout(output);
525                }
526                if !matches!(cli.command, Commands::Run { .. }) {
527                    break;
528                }
529            }
530
531            Message::EndpointClearAck(_) => {
532                write_stdout(crate::t!("all-services-removed"));
533                if !matches!(cli.command, Commands::Run { .. }) {
534                    break;
535                }
536            }
537
538            other => {
539                debug!("Unhandled message: {:?}", other);
540            }
541        }
542    }
543
544    command_tx.send(Message::Stop(Stop {})).await.ok();
545
546    Ok(())
547}