use anyhow::{bail, Context, Result};
use cloudpub_client::client::run_client;
pub use cloudpub_client::config::{ClientConfig, ClientOpts};
use cloudpub_common::config::MaskedString;
use cloudpub_common::logging::init_log;
use dirs::cache_dir;
use futures::future::FutureExt;
use parking_lot::RwLock;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tracing::warn;
use crate::connection::{CheckSignalFn, Connection, ConnectionEvent};
pub struct ConnectionBuilder {
config_path: Option<PathBuf>,
log_level: String,
verbose: bool,
token: Option<String>,
email: Option<String>,
password: Option<String>,
timeout: Duration,
check_signal_fn: Option<CheckSignalFn>,
}
impl Default for ConnectionBuilder {
fn default() -> Self {
Self {
config_path: None,
log_level: "info".to_string(),
verbose: false,
token: None,
email: None,
password: None,
timeout: Duration::from_secs(10),
check_signal_fn: None,
}
}
}
impl ConnectionBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn config_path<P: AsRef<Path>>(mut self, path: P) -> Self {
self.config_path = Some(path.as_ref().to_path_buf());
self
}
pub fn log_level<S: Into<String>>(mut self, level: S) -> Self {
self.log_level = level.into();
self
}
pub fn verbose(mut self, verbose: bool) -> Self {
self.verbose = verbose;
self
}
pub fn token<S: Into<String>>(mut self, token: S) -> Self {
self.token = Some(token.into());
self.email = None;
self.password = None;
self
}
pub fn credentials<S: Into<String>>(mut self, email: S, password: S) -> Self {
self.email = Some(email.into());
self.password = Some(password.into());
self.token = None;
self
}
pub fn email<S: Into<String>>(mut self, email: S) -> Self {
self.email = Some(email.into());
self.token = None;
self
}
pub fn password<S: Into<String>>(mut self, password: S) -> Self {
self.password = Some(password.into());
self.token = None;
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
pub fn timeout_secs(mut self, secs: u64) -> Self {
self.timeout = Duration::from_secs(secs);
self
}
pub fn check_signal_fn(mut self, check_fn: CheckSignalFn) -> Self {
self.check_signal_fn = Some(check_fn);
self
}
pub async fn build(self) -> Result<Connection> {
if self.email.is_some() && self.password.is_none() {
bail!("Password is required when email is provided");
}
if self.password.is_some() && self.email.is_none() {
bail!("Email is required when password is provided");
}
if let Err(err) = fdlimit::raise_fd_limit() {
warn!("Failed to raise file descriptor limit: {}", err);
}
let log_dir = cache_dir().context("Can't get cache dir")?.join("cloudpub");
std::fs::create_dir_all(&log_dir).context("Can't create log dir")?;
let log_file = log_dir.join("client.log");
let _guard = init_log(
&self.log_level,
&log_file,
self.verbose,
10 * 1024 * 1024,
2,
)
.context("Failed to initialize logging")?;
let mut config = if let Some(ref path) = self.config_path {
ClientConfig::from_file(path, false)?
} else {
ClientConfig::load("client.toml", true, false)?
};
if let Some(token_value) = self.token {
config.token = Some(MaskedString(token_value));
}
let config = Arc::new(RwLock::new(config));
let (command_tx, command_rx) = mpsc::channel(1024);
let (result_tx, result_rx) = mpsc::channel(1024);
let opts = if let Some(email_value) = self.email {
if let Some(password_value) = self.password {
ClientOpts {
credentials: Some((email_value, password_value)),
..Default::default()
}
} else {
ClientOpts::default()
}
} else {
ClientOpts::default()
};
let config_clone = config.clone();
let client_handle = tokio::spawn(async move {
if let Err(err) = run_client(config_clone, opts, command_rx, result_tx)
.boxed()
.await
{
tracing::error!("Client exited with error: {:?}, restarting in 5 sec..", err);
}
});
let connection = Connection::new(
config,
command_tx,
result_rx,
self.timeout,
self.check_signal_fn,
_guard,
Some(client_handle),
);
connection
.wait_for_event(|event| matches!(event, ConnectionEvent::Connected))
.await?;
Ok(connection)
}
}