use chrono::{SecondsFormat, Utc};
use futures::{executor::ThreadPool, task::SpawnExt};
use std::{
collections::HashSet,
iter::FromIterator,
sync::{Arc, Mutex},
};
use tracing::{debug, info};
use crate::{
bucket::{Bucket, BucketUpdateProgressContext},
cache::CacheFile,
error::{Error, Fallible},
event::Event,
internal, package,
package::{InstallInfo, Package, QueryOption},
Session, SyncOption,
};
pub fn bucket_add(session: &Session, name: &str, remote_url: &str) -> Fallible<()> {
let config = session.config();
let mut path = config.root_path().to_owned();
path.push("buckets");
internal::fs::ensure_dir(&path)?;
path.push(name);
if path.exists() {
return Err(Error::BucketAlreadyExists(name.to_owned()));
}
let proxy = config.proxy();
let remote_url = match remote_url.is_empty() {
false => remote_url,
true => crate::constant::BUILTIN_BUCKET_LIST
.iter()
.find(|&&(n, _)| n == name)
.map(|&(_, remote)| remote)
.ok_or_else(|| Error::BucketAddRemoteRequired(name.to_owned()))?,
};
internal::git::clone_repo(remote_url, path, proxy)
}
pub fn bucket_list(session: &Session) -> Fallible<Vec<Bucket>> {
crate::bucket::bucket_added(session).map(|mut buckets| {
buckets.sort_by_key(|b| b.name().to_owned());
buckets
})
}
pub fn bucket_list_known() -> Vec<(&'static str, &'static str)> {
crate::constant::BUILTIN_BUCKET_LIST.to_vec()
}
pub fn bucket_update(session: &Session) -> Fallible<()> {
let buckets = crate::bucket::bucket_added(session)?;
if buckets.is_empty() {
if let Some(tx) = session.emitter() {
let _ = tx.send(Event::BucketUpdateDone);
}
return Ok(());
}
let mut config = session.config_mut()?;
let any_bucket_updated = Arc::new(Mutex::new(false));
let mut tasks = Vec::new();
let pool = ThreadPool::builder().create()?;
let proxy = config.proxy().map(|s| s.to_owned());
let emitter = session.emitter();
for bucket in buckets.iter() {
let repo = bucket.path().to_owned();
if bucket.remote_url().is_none() {
info!("ignored non-updatable bucket '{}'", bucket.name());
continue;
}
let name = bucket.name().to_owned();
let flag = Arc::clone(&any_bucket_updated);
let proxy = proxy.clone();
let emitter = emitter.clone();
let task = pool
.spawn_with_handle(async move {
let mut ctx = BucketUpdateProgressContext::new(name.as_str());
if let Some(tx) = emitter.clone() {
let _ = tx.send(Event::BucketUpdateProgress(ctx.clone()));
}
match internal::git::reset_head(repo, proxy) {
Ok(_) => {
*flag.lock().unwrap() = true;
if let Some(tx) = emitter {
ctx.set_succeeded();
let _ = tx.send(Event::BucketUpdateProgress(ctx));
}
}
Err(err) => {
if let Some(tx) = emitter {
ctx.set_failed(err.to_string().as_str());
let _ = tx.send(Event::BucketUpdateProgress(ctx));
}
}
};
})
.map_err(|e| Error::Custom(e.to_string()))?;
tasks.push(task);
}
let joined = futures::future::join_all(tasks);
futures::executor::block_on(joined);
if *any_bucket_updated.lock().unwrap() {
let time = Utc::now().to_rfc3339_opts(SecondsFormat::Micros, false);
config.set("last_update", time.as_str())?;
}
if let Some(tx) = emitter {
let _ = tx.send(Event::BucketUpdateDone);
}
Ok(())
}
pub fn bucket_remove(session: &Session, name: &str) -> Fallible<()> {
let mut path = session.config().root_path().to_owned();
path.push("buckets");
path.push(name);
if !path.exists() {
return Err(Error::BucketNotFound(name.to_owned()));
}
Ok(remove_dir_all::remove_dir_all(path.as_path())?)
}
pub fn cache_list(session: &Session, query: &str) -> Fallible<Vec<CacheFile>> {
let is_wildcard_query = query.eq("*") || query.is_empty();
let config = session.config();
let cache_dir = config.cache_path();
let mut files = vec![];
match cache_dir.read_dir() {
Err(err) => {
debug!("failed to read cache dir (err: {})", err);
}
Ok(entires) => {
files = entires
.filter_map(|de| {
if let Ok(entry) = de {
let is_file = entry.file_type().unwrap().is_file();
if is_file {
if let Ok(item) = CacheFile::from(entry.path()) {
if !is_wildcard_query {
let matched = item
.package_name()
.to_lowercase()
.contains(&query.to_lowercase());
if matched {
return Some(item);
} else {
return None;
}
}
return Some(item);
}
}
}
None
})
.collect::<Vec<_>>();
}
}
Ok(files)
}
pub fn cache_remove(session: &Session, query: &str) -> Fallible<()> {
match query {
"*" => {
let config = session.config();
Ok(internal::fs::empty_dir(config.cache_path())?)
}
query => {
let files = cache_list(session, query)?;
for f in files.into_iter() {
std::fs::remove_file(f.path())?;
}
Ok(())
}
}
}
pub fn config_list(session: &Session) -> Fallible<String> {
let config = session.config();
config.pretty()
}
pub fn config_set(session: &Session, key: &str, value: &str) -> Fallible<()> {
session.config_mut()?.set(key, value)
}
pub fn package_hold(session: &Session, name: &str, flag: bool) -> Fallible<()> {
let mut path = session.config().root_path().to_owned();
path.push("apps");
path.push(name);
if !path.exists() {
return Err(Error::PackageHoldNotInstalled(name.to_owned()));
}
path.push("current");
path.push("install.json");
if let Ok(mut install_info) = InstallInfo::parse(&path) {
install_info.set_held(flag);
internal::fs::write_json(path, install_info)
} else {
Err(Error::PackageHoldBrokenInstall(name.to_owned()))
}
}
pub fn package_query(
session: &Session,
queries: Vec<&str>,
options: Vec<QueryOption>,
installed: bool,
) -> Fallible<Vec<Package>> {
let mut queries = HashSet::<&str>::from_iter(queries)
.into_iter()
.collect::<Vec<_>>();
if queries.is_empty() {
queries.push("*");
}
let mut packages = if installed {
package::query::query_installed(session, &queries, &options)?
} else {
package::query::query_synced(session, &queries, &options)?
};
packages.sort_by_key(|p| p.name().to_owned());
Ok(packages)
}
pub fn package_sync(
session: &Session,
queries: Vec<&str>,
options: Vec<SyncOption>,
) -> Fallible<()> {
let queries = HashSet::<&str>::from_iter(queries)
.into_iter()
.collect::<Vec<_>>();
if let Some(tx) = session.emitter() {
let _ = tx.send(Event::PackageResolveStart);
}
let is_op_remove = options.contains(&SyncOption::Remove);
if is_op_remove {
package::sync::remove(session, &queries, &options)?;
} else {
package::sync::install(session, &queries, &options)?;
}
if let Some(tx) = session.emitter() {
let _ = tx.send(Event::PackageSyncDone);
}
Ok(())
}