use std::collections::hash_map::RandomState;
use std::collections::BTreeMap;
use std::collections::HashSet;
use std::path::PathBuf;
use tokio_stream::StreamExt;
use tracing::debug;
use tracing::info;
use url::Url;
use crate::io::manifest::build_manifest_from_rows_stream;
use crate::io::manifest::RowsStream;
use crate::io::remote::Remote;
use crate::io::storage::Storage;
use crate::lineage::PackageLineage;
use crate::lineage::PathState;
use crate::manifest::Row;
use crate::manifest::Table;
use crate::paths::DomainPaths;
use crate::uri::Host;
use crate::uri::Namespace;
use crate::uri::S3Uri;
use crate::Error;
use crate::Res;
async fn cache_immutable_object(
storage: &impl Storage,
remote: &impl Remote,
host: &Option<Host>,
object_dest: &PathBuf,
uri: &S3Uri,
) -> Res {
let stream = remote.get_object_stream(host, uri).await?;
storage.write_byte_stream(object_dest, stream.body).await
}
async fn create_mutable_copy(
storage: &impl Storage,
immutable_source: &PathBuf,
mutable_target: &PathBuf,
) -> Res<chrono::DateTime<chrono::Utc>> {
let parent_dir = mutable_target.parent();
if let Some(parent) = parent_dir {
storage.create_dir_all(parent).await?;
}
storage.copy(&immutable_source, &mutable_target).await?;
storage.modified_timestamp(&mutable_target).await
}
async fn stream_remote_with_installed_rows(
remote_manifest: &Table,
local_entries: BTreeMap<PathBuf, Row>,
) -> impl RowsStream {
remote_manifest
.records_stream()
.await
.map(move |rows_result| {
rows_result.map(|rows| {
rows.into_iter()
.map(|row_result| {
row_result.map(|row| match local_entries.get(&row.name) {
Some(row) => row.clone(),
None => row,
})
})
.collect()
})
})
}
#[allow(clippy::too_many_arguments)]
pub async fn install_paths(
mut lineage: PackageLineage,
table: &mut Table,
paths: &DomainPaths,
working_dir: PathBuf, namespace: Namespace,
storage: &(impl Storage + Sync),
remote: &impl Remote,
entries_paths: &[&PathBuf],
) -> Res<PackageLineage> {
if entries_paths.is_empty() {
info!("No paths to install");
return Ok(lineage);
}
info!(
"⏳ Installing {} paths for package {}",
entries_paths.len(),
namespace
);
debug!("🔍 Checking for already installed paths");
if !HashSet::<&PathBuf, RandomState>::from_iter(lineage.paths.keys())
.is_disjoint(&HashSet::from_iter(entries_paths.to_owned()))
{
debug!("❌ Found paths that are already installed");
return Err(Error::InstallPath(
"some paths are already installed".to_string(),
));
}
let mut entries = BTreeMap::new();
for path in entries_paths {
let row = table
.get_record(path)
.await?
.ok_or(Error::Table(format!("path {path:?} not found")))?;
let object_dest = paths.object(row.hash.digest());
if !storage.exists(&object_dest).await {
cache_immutable_object(
storage,
remote,
&lineage.remote.catalog,
&object_dest,
&row.place.parse()?,
)
.await?;
debug!("✔️ Cached object: {}", object_dest.display());
} else {
debug!("✔️ Object already in cache: {}", object_dest.display());
}
let place = Url::from_file_path(&object_dest)
.map_err(|_| {
Error::InstallPath(format!("Failed to create URL from {:?}", &object_dest))
})?
.to_string();
debug!(
"✔️ Path {} converted to a `place` {}",
object_dest.display(),
place
);
entries.insert(
row.name.clone(),
Row {
place,
..row.clone()
},
);
let working_dest = working_dir.join(&row.name);
let last_modified = create_mutable_copy(storage, &object_dest, &working_dest).await?;
debug!(
"✔️ Created mutable copy at {} for {}",
last_modified,
working_dest.display()
);
lineage.paths.insert(
row.name.clone(),
PathState {
timestamp: last_modified,
hash: row.hash,
},
);
debug!("✔️ Added {} to lineage paths ", row.name.display());
}
debug!("⏳ Building manifest with installed rows");
let header = table.get_header().await?;
let stream = stream_remote_with_installed_rows(table, entries).await;
let dest_dir = paths.installed_manifests(&namespace);
build_manifest_from_rows_stream(storage, dest_dir, header, stream).await?;
info!("✔️ Successfully installed {} paths", entries_paths.len());
Ok(lineage)
}
#[cfg(test)]
mod tests {
use super::*;
use test_log::test;
use std::path::PathBuf;
use std::str::FromStr;
use crate::fixtures;
use crate::io::remote::mocks::MockRemote;
use crate::io::storage::mocks::MockStorage;
use crate::lineage::Home;
use crate::manifest::Row;
use crate::paths;
#[test(tokio::test)]
async fn test_installing_one_cached_path() -> Res {
let (home, _temp_dir1) = Home::from_temp_dir()?;
let (domain_paths, _temp_dir2) = &DomainPaths::from_temp_dir()?;
let namespace = Namespace::from(("foo", "bar"));
let package_home = paths::package_home(&home, &namespace);
let storage = MockStorage::default();
let lineage = PackageLineage::default();
let single_object_path = PathBuf::from("less-then-8mb.txt");
let entries_paths = vec![&single_object_path];
let mut manifest = fixtures::manifest_with_objects_all_sizes::manifest().await?;
let hash = manifest
.get_record(&single_object_path)
.await?
.unwrap()
.hash;
let object_path = domain_paths.object(hash.digest());
let absolute_path = home.join(object_path);
storage.write_file(absolute_path, &Vec::new()).await?;
assert!(lineage.paths.is_empty());
let remote = MockRemote::default();
let lineage = install_paths(
lineage,
&mut manifest,
domain_paths,
package_home.clone(),
namespace,
&storage,
&remote,
&entries_paths,
)
.await?;
assert!(lineage.paths.contains_key(&single_object_path));
assert!(storage.exists(&package_home.join(single_object_path)).await);
Ok(())
}
#[test(tokio::test)]
async fn test_installing_one_uncached_path() -> Res {
let (home, _temp_dir1) = Home::from_temp_dir()?;
let (domain_paths, _temp_dir2) = &DomainPaths::from_temp_dir()?;
let namespace = Namespace::from(("foo", "bar"));
let package_home = paths::package_home(&home, &namespace);
let remote = MockRemote::default();
let storage = MockStorage::default();
let single_object_path = PathBuf::from("a/a");
let entries_paths = vec![&single_object_path];
domain_paths
.scaffold_for_installing(&storage, &home, &namespace)
.await?;
let remote_file_url = "s3://any/valid-url.md".to_string();
let lineage = PackageLineage::default();
let remote_object_uri = S3Uri::from_str(&remote_file_url)?;
remote
.put_object(&lineage.remote.catalog, &remote_object_uri, Vec::new())
.await?;
let hash: multihash::Multihash<256> = multihash::Multihash::wrap(0x12, b"anything")?;
let mut manifest = Table::default();
manifest
.insert_record(Row {
name: single_object_path.clone(),
hash,
place: remote_file_url,
..Row::default()
})
.await?;
assert!(lineage.paths.is_empty());
let lineage = install_paths(
lineage,
&mut manifest,
domain_paths,
package_home.clone(),
namespace,
&storage,
&remote,
&entries_paths,
)
.await?;
assert!(lineage.paths.contains_key(&single_object_path));
assert!(storage.exists(&package_home.join(single_object_path)).await);
let object_path = domain_paths.object(hash.digest());
assert!(storage.exists(object_path).await);
Ok(())
}
#[test(tokio::test)]
async fn test_installing_multiple_paths() -> Res {
let (home, _temp_dir1) = Home::from_temp_dir()?;
let (domain_paths, _temp_dir2) = &DomainPaths::from_temp_dir()?;
let namespace = Namespace::from(("foo", "bar"));
let package_home = paths::package_home(&home, &namespace);
let lineage = PackageLineage::default();
let row_1 = Row {
name: PathBuf::from("a"),
place: "file:///ignored".to_string(),
hash: multihash::Multihash::wrap(0x12, b"one")?,
..Row::default()
};
let row_2 = Row {
name: PathBuf::from("b/b"),
place: "s3://bucket/foo/bar".to_string(),
hash: multihash::Multihash::wrap(0x12, b"two")?,
..Row::default()
};
let row_3 = Row {
name: PathBuf::from("c/c/c"),
place: "file:///ignored".to_string(),
hash: multihash::Multihash::wrap(0x12, b"three")?,
..Row::default()
};
let row_4 = Row {
name: PathBuf::from("d/d/d/d"),
place: "s3://bucket/foo/baz".to_string(),
hash: multihash::Multihash::wrap(0x12, b"four")?,
..Row::default()
};
let mut manifest = Table::default();
manifest.insert_record(row_1.clone()).await?;
manifest.insert_record(row_2.clone()).await?;
manifest.insert_record(row_3.clone()).await?;
manifest.insert_record(row_4.clone()).await?;
let storage = MockStorage::default();
let object_path_1 = home.join(domain_paths.object(row_1.hash.digest()));
storage.write_file(object_path_1, &Vec::new()).await?;
let object_path_3 = home.join(domain_paths.object(row_3.hash.digest()));
storage.write_file(object_path_3, &Vec::new()).await?;
let remote = MockRemote::default();
let remote_object_uri_2 = S3Uri::from_str(&row_2.place)?;
remote
.put_object(&lineage.remote.catalog, &remote_object_uri_2, Vec::new())
.await?;
let remote_object_uri_4 = S3Uri::from_str(&row_4.place)?;
remote
.put_object(&lineage.remote.catalog, &remote_object_uri_4, Vec::new())
.await?;
let entries_paths = vec![&row_1.name, &row_2.name, &row_3.name, &row_4.name];
assert!(lineage.paths.is_empty());
let lineage = install_paths(
lineage,
&mut manifest,
domain_paths,
package_home.clone(),
namespace,
&storage,
&remote,
&entries_paths,
)
.await?;
assert!(lineage.paths.contains_key(&row_1.name));
assert!(lineage.paths.contains_key(&row_2.name));
assert!(lineage.paths.contains_key(&row_3.name));
assert!(lineage.paths.contains_key(&row_4.name));
assert!(storage.exists(&package_home.join(row_1.name)).await);
assert!(storage.exists(&package_home.join(row_2.name)).await);
assert!(storage.exists(&package_home.join(row_3.name)).await);
assert!(storage.exists(&package_home.join(row_4.name)).await);
Ok(())
}
#[test(tokio::test)]
async fn test_installing_path_that_doesnt_exists_in_manifest() -> Res {
let lineage = PackageLineage::default();
let remote = MockRemote::default();
let storage = MockStorage::default();
let not_existed = PathBuf::from("z/z");
let entries_paths = vec![¬_existed];
let mut manifest = fixtures::manifest_with_objects_all_sizes::manifest().await?;
assert!(lineage.paths.is_empty());
let lineage = install_paths(
lineage,
&mut manifest,
&DomainPaths::default(),
PathBuf::new(),
Namespace::default(),
&storage,
&remote,
&entries_paths,
)
.await;
assert_eq!(
lineage.unwrap_err().to_string(),
r#"Table error: path "z/z" not found"#
);
Ok(())
}
#[test(tokio::test)]
async fn test_installing_more_than_1024_paths() -> Res {
let (home, _temp_dir1) = Home::from_temp_dir()?;
let (domain_paths, _temp_dir2) = &DomainPaths::from_temp_dir()?;
let namespace = Namespace::from(("foo", "bar"));
let package_home = paths::package_home(&home, &namespace);
let lineage = PackageLineage::default();
let storage = MockStorage::default();
let remote = MockRemote::default();
let mut manifest = Table::default();
let mut entries_paths = Vec::new();
let mut path_refs = Vec::new();
for i in 0..2048 {
let path = PathBuf::from(format!("path_{}.txt", i));
let place = format!("s3://bucket/path_{}.txt", i);
let hash = multihash::Multihash::wrap(0x12, format!("hash_{}", i).as_bytes())?;
let row = Row {
name: path.clone(),
place: place.clone(),
hash,
..Row::default()
};
manifest.insert_record(row).await?;
entries_paths.push(path);
let remote_uri = S3Uri::from_str(&place)?;
remote
.put_object(&lineage.remote.catalog, &remote_uri, Vec::new())
.await?;
}
for path in &entries_paths {
path_refs.push(path);
}
domain_paths
.scaffold_for_installing(&storage, &home, &namespace)
.await?;
assert!(lineage.paths.is_empty());
let lineage = install_paths(
lineage,
&mut manifest,
domain_paths,
package_home.clone(),
namespace,
&storage,
&remote,
&path_refs,
)
.await?;
assert_eq!(lineage.paths.len(), 2048);
for path in &entries_paths {
assert!(lineage.paths.contains_key(path));
}
for path in &entries_paths {
assert!(storage.exists(&package_home.join(path)).await);
}
Ok(())
}
}