use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
use std::fs;
use std::fs::File;
use std::io::Write;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::RwLock;
use chrono::Local;
use log::{info, warn};
use once_cell::sync::OnceCell;
#[cfg(feature = "s3")]
use rusoto_core::Region;
use walkdir::WalkDir;
use crate::ocfl::consts::*;
use crate::ocfl::digest::HexDigest;
use crate::ocfl::error::{MultiError, Result, RocflError};
use crate::ocfl::inventory::Inventory;
use crate::ocfl::lock::LockManager;
use crate::ocfl::store::fs::FsOcflStore;
use crate::ocfl::store::layout::{LayoutExtensionName, StorageLayout};
#[cfg(feature = "s3")]
use crate::ocfl::store::s3::S3OcflStore;
use crate::ocfl::store::{OcflStore, StagingStore};
use crate::ocfl::validate::ObjectValidationResult;
use crate::ocfl::Knowable::*;
use crate::ocfl::{
paths, util, validate, CommitMeta, ContentPath, Diff, DigestAlgorithm, IncrementalValidator,
InventoryPath, Knowable, LogicalPath, ObjectInfo, ObjectVersion, ObjectVersionDetails,
RepoInfo, SpecVersion, VersionDetails, VersionNum, VersionRef,
};
pub struct OcflRepo {
store: Box<dyn OcflStore + Sync + Send>,
staging: OnceCell<FsOcflStore>,
staging_lock_manager: OnceCell<LockManager>,
staging_root: PathBuf,
spec_version: RwLock<Option<Knowable<SpecVersion, String>>>,
use_backslashes: bool,
closed: AtomicBool,
}
impl OcflRepo {
pub fn fs_repo(storage_root: impl AsRef<Path>, staging: Option<&Path>) -> Result<Self> {
let staging_root = match staging {
Some(staging) => staging.to_path_buf(),
None => paths::staging_extension_path(storage_root.as_ref()),
};
let store = FsOcflStore::new(storage_root)?;
let spec_version = store.repo_spec_version()?;
Ok(Self {
staging_root,
store: Box::new(store),
staging: OnceCell::default(),
staging_lock_manager: OnceCell::default(),
spec_version: RwLock::new(spec_version),
use_backslashes: util::BACKSLASH_SEPARATOR,
closed: AtomicBool::new(false),
})
}
pub fn init_fs_repo(
storage_root: impl AsRef<Path>,
staging: Option<&Path>,
version: SpecVersion,
layout: Option<StorageLayout>,
) -> Result<Self> {
let staging_root = match staging {
Some(staging) => staging.to_path_buf(),
None => paths::staging_extension_path(storage_root.as_ref()),
};
Ok(Self {
staging_root,
store: Box::new(FsOcflStore::init(storage_root, version, layout)?),
staging: OnceCell::default(),
staging_lock_manager: OnceCell::default(),
spec_version: RwLock::new(Some(Known(version))),
use_backslashes: util::BACKSLASH_SEPARATOR,
closed: AtomicBool::new(false),
})
}
#[cfg(feature = "s3")]
pub fn init_s3_repo(
region: Region,
bucket: &str,
prefix: Option<&str>,
profile: Option<&str>,
staging_root: impl AsRef<Path>,
version: SpecVersion,
layout: Option<StorageLayout>,
) -> Result<Self> {
Ok(Self {
staging_root: staging_root.as_ref().to_path_buf(),
store: Box::new(S3OcflStore::init(
region, bucket, prefix, profile, version, layout,
)?),
staging: OnceCell::default(),
staging_lock_manager: OnceCell::default(),
spec_version: RwLock::new(Some(Known(version))),
use_backslashes: false,
closed: AtomicBool::new(false),
})
}
#[cfg(feature = "s3")]
pub fn s3_repo(
region: Region,
bucket: &str,
prefix: Option<&str>,
staging_root: impl AsRef<Path>,
profile: Option<&str>,
) -> Result<Self> {
let store = S3OcflStore::new(region, bucket, prefix, profile)?;
let spec_version = store.repo_spec_version()?;
Ok(Self {
staging_root: staging_root.as_ref().to_path_buf(),
store: Box::new(store),
staging: OnceCell::default(),
staging_lock_manager: OnceCell::default(),
spec_version: RwLock::new(spec_version),
use_backslashes: false,
closed: AtomicBool::new(false),
})
}
pub fn close(&self) {
info!("Closing OCFL repository");
self.closed.store(true, Ordering::Release);
self.store.close();
}
pub fn validate_object(
&self,
object_id: &str,
fixity_check: bool,
) -> Result<ObjectValidationResult> {
self.ensure_open()?;
self.store.validate_object(object_id, fixity_check)
}
pub fn validate_object_at(
&self,
path: &str,
fixity_check: bool,
) -> Result<ObjectValidationResult> {
self.ensure_open()?;
self.store.validate_object_at(path, fixity_check)
}
pub fn validate_repo<'a>(
&'a self,
fixity_check: bool,
) -> Result<Box<dyn IncrementalValidator + 'a>> {
self.ensure_open()?;
self.store.validate_repo(fixity_check)
}
pub fn describe_repo(&self) -> Result<RepoInfo> {
self.ensure_open()?;
self.store.describe_repo()
}
pub fn describe_object(&self, object_id: &str) -> Result<ObjectInfo> {
self.ensure_open()?;
self.store.describe_object(object_id)
}
pub fn describe_staged_object(&self, object_id: &str) -> Result<ObjectInfo> {
self.ensure_open()?;
if !self.staging_root.exists() {
return Err(RocflError::NotFound(format!(
"{} does not have a staged version.",
object_id
)));
}
match self.get_staging()?.describe_object(object_id) {
Ok(inventory) => Ok(inventory),
Err(RocflError::NotFound(_)) => Err(RocflError::NotFound(format!(
"{} does not have a staged version.",
object_id
))),
Err(e) => Err(e),
}
}
pub fn list_objects<'a>(
&'a self,
filter_glob: Option<&str>,
) -> Result<Box<dyn Iterator<Item = Result<ObjectVersionDetails>> + 'a>> {
self.ensure_open()?;
let inv_iter = self.store.iter_inventories(filter_glob)?;
Ok(Box::new(InventoryAdapterIter::new(inv_iter, |inventory| {
ObjectVersionDetails::from_inventory(inventory, VersionRef::Head)
})))
}
pub fn list_staged_objects<'a>(
&'a self,
filter_glob: Option<&str>,
) -> Result<Box<dyn Iterator<Item = Result<ObjectVersionDetails>> + 'a>> {
self.ensure_open()?;
if !self.staging_root.exists() {
return Ok(Box::new(Vec::new().into_iter()));
}
let inv_iter = self.get_staging()?.iter_inventories(filter_glob)?;
Ok(Box::new(InventoryAdapterIter::new(inv_iter, |inventory| {
ObjectVersionDetails::from_inventory(inventory, VersionRef::Head)
})))
}
pub fn get_object(&self, object_id: &str, version_num: VersionRef) -> Result<ObjectVersion> {
self.ensure_open()?;
let inventory = self.store.get_inventory(object_id)?;
let object_root = inventory.storage_path.clone();
ObjectVersion::from_inventory(
inventory,
version_num,
&object_root,
None,
self.use_backslashes,
)
}
pub fn get_staged_object(&self, object_id: &str) -> Result<ObjectVersion> {
self.ensure_open()?;
let staging_inventory = self.get_staged_inventory(object_id)?;
let version = staging_inventory.head;
let object_staging_root = staging_inventory.storage_path.clone();
let object_storage_root = match self.store.get_inventory(object_id) {
Ok(inventory) => Some(inventory.storage_path),
Err(RocflError::NotFound(_)) => None,
Err(e) => return Err(e),
};
let (root, staging) = if let Some(storage_root) = object_storage_root {
(storage_root, Some(object_staging_root))
} else {
(object_staging_root, None)
};
ObjectVersion::from_inventory(
staging_inventory,
version.into(),
&root,
staging.as_ref(),
util::BACKSLASH_SEPARATOR,
)
}
pub fn get_object_details(
&self,
object_id: &str,
version_num: VersionRef,
) -> Result<ObjectVersionDetails> {
self.ensure_open()?;
let inventory = self.store.get_inventory(object_id)?;
ObjectVersionDetails::from_inventory(inventory, version_num)
}
pub fn get_staged_object_details(&self, object_id: &str) -> Result<ObjectVersionDetails> {
self.ensure_open()?;
let inventory = self.get_staged_inventory(object_id)?;
let version = inventory.head;
ObjectVersionDetails::from_inventory(inventory, version.into())
}
pub fn list_object_versions(&self, object_id: &str) -> Result<Vec<VersionDetails>> {
self.ensure_open()?;
let inventory = self.store.get_inventory(object_id)?;
let mut versions = Vec::with_capacity(inventory.versions.len());
for (id, version) in inventory.versions {
versions.push(VersionDetails::from_version(id, version))
}
Ok(versions)
}
pub fn get_object_file(
&self,
object_id: &str,
path: &LogicalPath,
version_num: VersionRef,
sink: &mut dyn Write,
) -> Result<()> {
self.ensure_open()?;
self.store
.get_object_file(object_id, path, version_num, sink)
}
pub fn get_staged_object_file(
&self,
object_id: &str,
path: &LogicalPath,
sink: &mut dyn Write,
) -> Result<()> {
self.ensure_open()?;
let inventory = self.get_staged_inventory(object_id)?;
let content_path = inventory.content_path_for_logical_path(path, VersionRef::Head)?;
let version_prefix = format!("{}/", inventory.head);
if content_path.starts_with(&version_prefix) {
self.get_staging()?
.get_object_file(object_id, path, VersionRef::Head, sink)
} else {
self.store
.get_object_file(object_id, path, inventory.head.previous()?.into(), sink)
}
}
pub fn list_file_versions(
&self,
object_id: &str,
path: &LogicalPath,
) -> Result<Vec<VersionDetails>> {
self.ensure_open()?;
let inventory = self.store.get_inventory(object_id)?;
let mut versions = Vec::new();
let mut current_digest: Option<Rc<HexDigest>> = None;
for (id, version) in inventory.versions {
match version.lookup_digest(path) {
Some(digest) => {
if current_digest.is_none()
|| current_digest.as_ref().unwrap().as_ref().ne(digest)
{
current_digest = Some(digest.clone());
versions.push(VersionDetails::from_version(id, version));
}
}
None => {
if current_digest.is_some() {
current_digest = None;
versions.push(VersionDetails::from_version(id, version));
}
}
}
}
if versions.is_empty() {
return Err(RocflError::NotFound(format!(
"Path {} not found in object {}",
path, object_id
)));
}
Ok(versions)
}
pub fn diff(
&self,
object_id: &str,
left_version: Option<VersionNum>,
right_version: VersionNum,
) -> Result<Vec<Diff>> {
self.ensure_open()?;
self.store
.get_inventory(object_id)?
.diff_versions(left_version, right_version)
}
pub fn diff_staged(&self, object_id: &str) -> Result<Vec<Diff>> {
self.ensure_open()?;
if !self.staging_root.exists() {
return Ok(Vec::new());
}
match self.get_staging()?.get_inventory(object_id) {
Err(RocflError::NotFound(_)) => Ok(Vec::new()),
Err(e) => Err(e),
Ok(inventory) => inventory.diff_versions(None, inventory.head),
}
}
pub fn purge_object(&self, object_id: &str) -> Result<()> {
self.ensure_open()?;
if self.staging_root.exists() {
self.get_staging()?.purge_object(object_id)?;
}
if self.is_open() {
self.store.purge_object(object_id)
} else {
Ok(())
}
}
pub fn create_object(
&self,
object_id: &str,
spec_version: Option<SpecVersion>,
digest_algorithm: DigestAlgorithm,
content_dir: &str,
padding_width: u32,
) -> Result<()> {
self.ensure_open()?;
let object_id = object_id.trim();
let repo_version = self.spec_version.read().unwrap().clone();
let object_version = if let Some(object_version) = spec_version {
if let Some(Known(repo_version)) = &repo_version {
validate::validate_spec_version(object_version, *repo_version)?;
}
object_version
} else if let Some(Known(repo_version)) = &repo_version {
info!("Defaulting object version to {}", repo_version.version());
*repo_version
} else {
info!(
"Unknown repository version, defaulting object version to {}",
DEFAULT_VERSION.version()
);
DEFAULT_VERSION
};
validate::validate_object_id(object_id)?;
validate::validate_digest_algorithm(digest_algorithm)?;
validate::validate_content_dir(content_dir)?;
let _lock = self.get_lock_manager()?.acquire(object_id)?;
match self.store.get_inventory(object_id) {
Err(RocflError::NotFound(_)) => (),
Err(e) => return Err(e),
_ => {
return Err(RocflError::IllegalState(format!(
"Cannot create object {} because it already exists",
object_id
)));
}
}
let version_num = VersionNum::v1_with_width(padding_width);
let mut inventory = Inventory::builder(object_id, object_version)
.with_digest_algorithm(digest_algorithm)
.with_content_directory(content_dir)
.with_head(version_num)
.build()?;
self.get_staging()?.stage_object(&mut inventory)
}
pub fn copy_files_external(
&self,
object_id: &str,
src: &[impl AsRef<Path>],
dst: &str,
recursive: bool,
) -> Result<()> {
self.ensure_open()?;
self.operate_on_external_source(
object_id,
src,
dst,
recursive,
|file, logical_path, inventory| self.copy_file(file, logical_path, inventory),
)
}
pub fn copy_files_internal(
&self,
object_id: &str,
version_num: VersionRef,
src: &[impl AsRef<str>],
dst: &str,
recursive: bool,
) -> Result<()> {
self.ensure_open()?;
if src.is_empty() {
return Ok(());
}
let _lock = self.get_lock_manager()?.acquire(object_id)?;
let mut inventory = self.get_or_created_staged_inventory(object_id)?;
let src_version_num = version_num.resolve(inventory.head);
let staging = self.get_staging()?;
let (to_copy, mut errors) =
self.resolve_internal_moves(&inventory, src_version_num, src, dst, recursive)?;
for (src_path, dst_path) in to_copy {
if self.is_closed() {
break;
}
let attempt = || -> Result<()> {
info!(
"Copying file {} from {} to {}",
src_path, src_version_num, dst_path
);
let digest_and_path =
lookup_staged_digest_and_content_path(&inventory, src_version_num, &src_path)?;
if let Some((digest, content_path)) = digest_and_path {
inventory
.head_version()
.validate_non_conflicting(&dst_path)?;
staging.copy_staged_file(&inventory, &content_path, &dst_path)?;
inventory.add_file_to_head(digest, dst_path)
} else {
inventory.copy_file_to_head(src_version_num, &src_path, dst_path)
}
};
if let Err(e) = attempt() {
errors.push(format!("Failed to copy file {}: {}", src_path, e));
}
}
inventory.head_version_mut().created = Local::now();
staging.stage_inventory(&inventory, false, false)?;
if !errors.is_empty() {
return Err(RocflError::CopyMoveError(MultiError(errors)));
}
Ok(())
}
pub fn move_files_external(
&self,
object_id: &str,
src: &[impl AsRef<Path>],
dst: &str,
) -> Result<()> {
self.ensure_open()?;
self.operate_on_external_source(
object_id,
src,
dst,
true,
|file, logical_path, inventory| self.move_file(file, logical_path, inventory),
)?;
if self.is_open() {
for path in src {
let path = path.as_ref();
if path.exists() && path.is_dir() {
util::clean_dirs_down(path)?;
}
}
}
Ok(())
}
pub fn move_files_internal(
&self,
object_id: &str,
src: &[impl AsRef<str>],
dst: &str,
) -> Result<()> {
self.ensure_open()?;
if src.is_empty() {
return Ok(());
}
let _lock = self.get_lock_manager()?.acquire(object_id)?;
let mut inventory = self.get_or_created_staged_inventory(object_id)?;
let staging = self.get_staging()?;
let (to_move, mut errors) =
self.resolve_internal_moves(&inventory, inventory.head, src, dst, true)?;
for (src_path, dst_path) in to_move {
if self.is_closed() {
break;
}
info!("Moving {} to {}", src_path, dst_path);
let attempt = || -> Result<()> {
let digest_and_path =
lookup_staged_digest_and_content_path(&inventory, inventory.head, &src_path)?;
if let Some((digest, content_path)) = digest_and_path {
inventory
.head_version()
.validate_non_conflicting(&dst_path)?;
staging.move_staged_file(&inventory, &content_path, &dst_path)?;
inventory.move_new_in_head_file(digest, &src_path, dst_path)
} else {
inventory.move_file_in_head(&src_path, dst_path)
}
};
if let Err(e) = attempt() {
errors.push(format!("Failed to move file {}: {}", src_path, e));
}
}
inventory.head_version_mut().created = Local::now();
staging.stage_inventory(&inventory, false, false)?;
if !errors.is_empty() {
return Err(RocflError::CopyMoveError(MultiError(errors)));
}
Ok(())
}
pub fn remove_files<P: AsRef<str>>(
&self,
object_id: &str,
paths: &[P],
recursive: bool,
) -> Result<()> {
self.ensure_open()?;
if paths.is_empty() {
return Ok(());
}
let _lock = self.get_lock_manager()?.acquire(object_id)?;
let mut inventory = self.get_or_created_staged_inventory(object_id)?;
let version = inventory.head_version();
let mut paths_to_remove = HashSet::new();
for path in paths {
paths_to_remove.extend(version.resolve_glob(path.as_ref(), recursive)?);
}
let staging = self.get_staging()?;
for path in paths_to_remove {
if self.is_closed() {
break;
}
info!("Removing path from staged version: {}", path);
if let Some(content_path) = inventory.remove_logical_path_from_head(&path) {
staging.rm_staged_files(&inventory, &[&content_path])?;
}
}
staging.stage_inventory(&inventory, false, false)?;
Ok(())
}
pub fn reset_all(&self, object_id: &str) -> Result<()> {
self.ensure_open()?;
if self.staging_root.exists() {
self.get_staging()?.purge_object(object_id)
} else {
Ok(())
}
}
pub fn reset<P: AsRef<str>>(
&self,
object_id: &str,
paths: &[P],
recursive: bool,
) -> Result<()> {
self.ensure_open()?;
if paths.is_empty() {
return Ok(());
}
let staging = self.get_staging()?;
let _lock = self.get_lock_manager()?.acquire(object_id)?;
let mut inventory = match staging.get_inventory(object_id) {
Ok(inventory) => inventory,
Err(RocflError::NotFound(_)) => return Ok(()),
Err(e) => return Err(e),
};
let head = inventory.head_version();
let (previous, previous_num) = if inventory.is_new() {
(None, None)
} else {
let previous_num = inventory.head.previous()?;
(
Some(inventory.get_version(previous_num)?),
Some(previous_num),
)
};
let mut head_paths = HashSet::new();
let mut previous_paths = HashSet::new();
for path in paths {
head_paths.extend(head.resolve_glob(path.as_ref(), recursive)?);
if let Some(previous) = previous {
previous_paths.extend(previous.resolve_glob(path.as_ref(), recursive)?);
}
}
let reset_adds = head_paths
.into_iter()
.filter(|path| !previous_paths.contains(path))
.collect::<HashSet<Rc<LogicalPath>>>();
for path in reset_adds {
if self.is_closed() {
break;
}
if let Some(content_path) = inventory.remove_logical_path_from_head(&path) {
staging.rm_staged_files(&inventory, &[&content_path])?;
}
}
for path in previous_paths {
if self.is_closed() {
break;
}
if let Some(previous_num) = previous_num {
if let Some(content_path) = inventory.remove_logical_path_from_head(&path) {
staging.rm_staged_files(&inventory, &[&content_path])?;
}
inventory.copy_file_to_head(previous_num, &path, path.as_ref().clone())?;
}
}
inventory.head_version_mut().created = Local::now();
staging.stage_inventory(&inventory, false, false)
}
pub fn commit(
&self,
object_id: &str,
meta: CommitMeta,
object_root: Option<&str>,
pretty_print: bool,
) -> Result<()> {
self.ensure_open()?;
let staging = self.get_staging()?;
let _lock = self.get_lock_manager()?.acquire(object_id)?;
self.commit_inner(object_id, meta, object_root, pretty_print, staging)
}
pub fn upgrade_object(
&self,
object_id: &str,
version: SpecVersion,
meta: CommitMeta,
pretty_print: bool,
) -> Result<()> {
self.ensure_open()?;
let staging = self.get_staging()?;
let _lock = self.get_lock_manager()?.acquire(object_id)?;
let mut inventory = self.get_or_created_staged_inventory(object_id)?;
if let Some(current_version) = inventory.spec_version() {
if version <= current_version {
return Err(RocflError::IllegalOperation(
format!("Cannot upgrade object to {} because the current version, {}, is greater than or equal to the new version.",
version.version(), current_version.version())));
}
let repo_version = self.spec_version.read().unwrap().clone();
if let Some(Known(repo_version)) = repo_version {
if repo_version < version {
return Err(RocflError::IllegalOperation(
format!("Cannot upgrade object to {} because it is less than the repository version, {}.",
version.version(), repo_version.version())));
}
} else if let Some(Unknown(repo_version)) = repo_version {
return Err(RocflError::IllegalOperation(format!(
"Cannot upgrade object to {} because the repository version, {}, is unrecognized.",
version.version(), repo_version
)));
}
} else {
return Err(RocflError::IllegalOperation(format!(
"Cannot upgrade object to {} because the current version is unknown",
version.version()
)));
}
inventory.type_declaration = version.inventory_type().to_string();
staging.stage_inventory(&inventory, false, false)?;
self.commit_inner(object_id, meta, None, pretty_print, staging)
}
pub fn upgrade_repo(&self, version: SpecVersion) -> Result<()> {
self.ensure_open()?;
let current_version = self.spec_version.read().unwrap().clone();
if let Some(Known(current)) = current_version {
if current >= version {
return Err(RocflError::IllegalOperation(
format!("Cannot upgrade repository to {} because the current version, {}, is greater than or equal to the new version.",
version.version(), current.version())));
}
} else if let Some(Unknown(current)) = current_version {
return Err(RocflError::IllegalOperation(format!(
"Cannot upgrade repository to {} because the current version, {}, is unrecognized.",
version.version(),
current
)));
}
self.store.upgrade_repo(version)?;
let mut repo_version = self.spec_version.write().unwrap();
*repo_version = Some(Known(version));
Ok(())
}
fn commit_inner(
&self,
object_id: &str,
meta: CommitMeta,
object_root: Option<&str>,
pretty_print: bool,
staging: &FsOcflStore,
) -> Result<()> {
let mut inventory = match staging.get_inventory(object_id) {
Ok(inventory) => inventory,
Err(RocflError::NotFound(_)) => {
return Err(RocflError::General(format!(
"No staged changes found for object {}",
object_id
)));
}
Err(e) => return Err(e),
};
let duplicates = inventory.dedup_head();
inventory.head_version_mut().update_meta(meta);
staging.stage_inventory(&inventory, true, pretty_print)?;
staging.rm_staged_files(
&inventory,
&duplicates
.iter()
.map(|p| p.as_ref())
.collect::<Vec<&ContentPath>>(),
)?;
staging.rm_orphaned_files(&inventory)?;
if self.is_open() {
if inventory.is_new() {
let src_object_root = PathBuf::from(&inventory.storage_path);
self.store
.write_new_object(&mut inventory, &src_object_root, object_root)?;
} else {
let version_root = paths::version_path(&inventory.storage_path, inventory.head);
self.store
.write_new_version(&mut inventory, &version_root)?;
}
staging.purge_object(object_id)?;
}
Ok(())
}
fn get_or_created_staged_inventory(&self, object_id: &str) -> Result<Inventory> {
let staging = self.get_staging()?;
match staging.get_inventory(object_id) {
Ok(inventory) => Ok(inventory),
Err(RocflError::NotFound(_)) => {
let mut inventory = self.store.get_inventory(object_id)?;
if inventory.spec_version().is_none() {
return Err(RocflError::IllegalOperation(
"Cannot update object because it conforms to an unknown/unsupported spec version".to_string(),
));
}
if inventory.mutable_head {
return Err(RocflError::IllegalState(
"Cannot stage changes for object because it has an active mutable HEAD."
.to_string(),
));
}
for extension in self.store.list_object_extensions(object_id)? {
if !SUPPORTED_EXTENSIONS.contains(&extension.as_ref()) {
warn!("Object {} uses unsupported extension {}. Modifying this object may have unintended consequences.",
object_id, extension);
}
}
inventory.create_staging_head()?;
staging.stage_object(&mut inventory)?;
Ok(inventory)
}
Err(e) => Err(e),
}
}
fn get_staged_inventory(&self, object_id: &str) -> Result<Inventory> {
if !self.staging_root.exists() {
return Err(RocflError::NotFound(format!(
"{} does not have a staged version.",
object_id
)));
}
match self.get_staging()?.get_inventory(object_id) {
Ok(inventory) => Ok(inventory),
Err(RocflError::NotFound(_)) => Err(RocflError::NotFound(format!(
"{} does not have a staged version.",
object_id
))),
Err(e) => Err(e),
}
}
fn operate_on_external_source(
&self,
object_id: &str,
src: &[impl AsRef<Path>],
dst: &str,
recursive: bool,
operator: impl Fn(&Path, LogicalPath, &mut Inventory) -> Result<()>,
) -> Result<()> {
if src.is_empty() {
return Ok(());
}
let _lock = self.get_lock_manager()?.acquire(object_id)?;
let mut inventory = self.get_or_created_staged_inventory(object_id)?;
let dst_path = dst.try_into()?;
let dst_dir_exists = inventory.head_version().is_dir(&dst_path);
let src_is_many = src.len() > 1;
let dst_has_slash = dst.ends_with('/');
let mut errors = Vec::new();
for path in src.iter() {
if self.is_closed() {
break;
}
let path = path.as_ref();
if !path.exists() {
errors.push(format!(
"Failed to copy/move {}: Does not exist",
path.to_string_lossy()
));
continue;
}
let mut attempt = || -> Result<()> {
if path.is_file() {
let parent = path.parent().unwrap();
let logical_path = if dst_dir_exists || src_is_many || dst_has_slash {
logical_path_in_dst_dir(path, parent, dst)?
} else {
dst_path.clone()
};
inventory
.head_version()
.validate_non_conflicting(&logical_path)?;
operator(path, logical_path, &mut inventory)?;
} else if recursive {
for file in WalkDir::new(path) {
if self.is_closed() {
break;
}
let file = file?;
if file.file_type().is_file() {
let mut attempt = || -> Result<()> {
let logical_path = if dst_dir_exists || src_is_many {
let grandparent = path.parent().unwrap_or(path);
logical_path_in_dst_dir(file.path(), grandparent, dst)?
} else {
logical_path_in_dst_dir(file.path(), path, dst)?
};
inventory
.head_version()
.validate_non_conflicting(&logical_path)?;
operator(file.path(), logical_path, &mut inventory)
};
if let Err(e) = attempt() {
errors.push(format!(
"Failed to copy/move {}: {}",
file.path().to_string_lossy(),
e
));
}
}
}
} else {
errors.push(format!(
"Skipping directory {} because recursion is not enabled",
path.to_string_lossy()
));
}
Ok(())
};
if let Err(e) = attempt() {
errors.push(format!(
"Failed to copy/move {}: {}",
path.to_string_lossy(),
e
));
}
}
inventory.head_version_mut().created = Local::now();
self.get_staging()?
.stage_inventory(&inventory, false, false)?;
if !errors.is_empty() {
return Err(RocflError::CopyMoveError(MultiError(errors)));
}
Ok(())
}
fn copy_file(
&self,
file: impl AsRef<Path>,
logical_path: LogicalPath,
inventory: &mut Inventory,
) -> Result<()> {
let mut reader = inventory.digest_algorithm.reader(File::open(&file)?);
info!(
"Copying file {} into object at {}",
file.as_ref().to_string_lossy(),
logical_path
);
self.get_staging()?
.stage_file_copy(inventory, &mut reader, &logical_path)?;
let digest = reader.finalize_hex();
inventory.add_file_to_head(digest, logical_path)
}
fn move_file(
&self,
file: impl AsRef<Path>,
logical_path: LogicalPath,
inventory: &mut Inventory,
) -> Result<()> {
info!(
"Moving file {} into object at {}",
file.as_ref().to_string_lossy(),
logical_path
);
let digest = inventory
.digest_algorithm
.hash_hex(&mut File::open(file.as_ref())?)?;
self.get_staging()?
.stage_file_move(inventory, &file, &logical_path)?;
inventory.add_file_to_head(digest, logical_path)
}
#[allow(clippy::type_complexity)]
fn resolve_internal_moves(
&self,
inventory: &Inventory,
src_version_num: VersionNum,
src: &[impl AsRef<str>],
dst: &str,
recursive: bool,
) -> Result<(HashMap<Rc<LogicalPath>, LogicalPath>, Vec<String>)> {
let mut to_move = HashMap::new();
let mut errors = Vec::new();
let dst_path = dst.try_into()?;
let dst_dir_exists = inventory.head_version().is_dir(&dst_path);
let src_is_many = src.len() > 1;
let dst_has_slash = dst.ends_with('/');
let version = inventory.get_version(src_version_num)?;
for path in src {
let mut has_matches = false;
let files = match version.resolve_glob(path.as_ref(), false) {
Ok(files) => files,
Err(e) => {
errors.push(format!("Failed to resolve path {}: {}", path.as_ref(), e));
continue;
}
};
let many_files = files.len() > 1;
if recursive {
let dirs = match version.resolve_glob_to_dirs(path.as_ref()) {
Ok(dirs) => dirs,
Err(e) => {
errors.push(format!("Failed to resolve path {}: {}", path.as_ref(), e));
HashSet::new()
}
};
let many_dirs = dirs.len() > 1;
for dir in dirs {
let children = version.paths_with_prefix(dir.as_ref());
let many_children = children.len() > 1;
for file in children {
let mut attempt = || -> Result<()> {
let logical_path = if dst_dir_exists
|| src_is_many
|| many_children
|| many_dirs
|| !files.is_empty()
{
logical_path_in_dst_dir_internal(&file, &dir.parent(), dst)?
} else {
logical_path_in_dst_dir_internal(&file, dir, dst)?
};
has_matches = true;
to_move.insert(file.clone(), logical_path);
Ok(())
};
if let Err(e) = attempt() {
errors.push(format!("Failed to copy/move file {}: {}", file, e));
}
}
}
}
for file in files {
let mut attempt = || -> Result<()> {
let logical_path = if dst_dir_exists
|| src_is_many
|| dst_has_slash
|| many_files
|| !to_move.is_empty()
{
dst_path.resolve(&file.filename().try_into()?)
} else {
dst_path.clone()
};
has_matches = true;
to_move.insert(file.clone(), logical_path);
Ok(())
};
if let Err(e) = attempt() {
errors.push(format!("Failed to copy/move file {}: {}", file, e));
}
}
if !has_matches {
errors.push(format!(
"Object {} version {} does not contain any files at {}",
inventory.id,
src_version_num,
path.as_ref()
));
}
}
Ok((to_move, errors))
}
fn get_staging(&self) -> Result<&FsOcflStore> {
self.staging.get_or_try_init(|| {
FsOcflStore::init_if_needed(
&self.staging_root,
StorageLayout::new(LayoutExtensionName::HashedNTupleLayout, None)?,
)
})
}
fn get_lock_manager(&self) -> Result<&LockManager> {
self.get_staging()?;
self.staging_lock_manager
.get_or_try_init(|| -> Result<LockManager> {
let dir = paths::locks_extension_path(&self.staging_root);
fs::create_dir_all(&dir)?;
Ok(LockManager::new(dir))
})
}
fn ensure_open(&self) -> Result<()> {
if self.is_closed() {
Err(RocflError::Closed)
} else {
Ok(())
}
}
fn is_closed(&self) -> bool {
self.closed.load(Ordering::Acquire)
}
fn is_open(&self) -> bool {
!self.is_closed()
}
}
struct InventoryAdapterIter<'a, T> {
iter: Box<dyn Iterator<Item = Result<Inventory>> + 'a>,
adapter: Box<dyn Fn(Inventory) -> Result<T>>,
}
impl<'a, T> InventoryAdapterIter<'a, T> {
fn new(
iter: Box<dyn Iterator<Item = Result<Inventory>> + 'a>,
adapter: impl Fn(Inventory) -> Result<T> + 'a + 'static,
) -> Self {
Self {
iter,
adapter: Box::new(adapter),
}
}
}
impl<'a, T> Iterator for InventoryAdapterIter<'a, T> {
type Item = Result<T>;
fn next(&mut self) -> Option<Self::Item> {
match self.iter.next() {
None => None,
Some(Ok(inventory)) => match self.adapter.deref()(inventory) {
Ok(adapted) => Some(Ok(adapted)),
Err(e) => Some(Err(e)),
},
Some(Err(e)) => Some(Err(e)),
}
}
}
fn logical_path_in_dst_dir(
src: impl AsRef<Path>,
base: impl AsRef<Path>,
dst: &str,
) -> Result<LogicalPath> {
let mut logical_path = dst.to_string();
if !logical_path.ends_with('/') {
logical_path.push('/');
}
let relative_path = pathdiff::diff_paths(src, base).unwrap();
let relative_str = relative_path.to_string_lossy();
logical_path.push_str(&util::convert_backslash_to_forward(&relative_str));
logical_path.try_into()
}
fn logical_path_in_dst_dir_internal(
src: &LogicalPath,
base: &LogicalPath,
dst: &str,
) -> Result<LogicalPath> {
let mut logical_path = dst.to_string();
if !logical_path.ends_with('/') {
logical_path.push('/');
}
let base_length = if base.as_str().is_empty() {
0
} else {
base.as_str().len() + 1
};
logical_path.push_str(&src.as_str()[base_length..]);
logical_path.try_into()
}
fn lookup_staged_digest_and_content_path(
inventory: &Inventory,
src_version_num: VersionNum,
src_path: &LogicalPath,
) -> Result<Option<(HexDigest, Rc<ContentPath>)>> {
let staging_prefix = format!("{}/", inventory.head);
match inventory
.get_version(src_version_num)?
.lookup_digest(src_path)
{
Some(digest) => {
let content_path =
inventory.content_path_for_digest(digest, VersionRef::Head, Some(src_path))?;
if content_path.starts_with(&staging_prefix) {
Ok(Some((digest.as_ref().clone(), content_path.clone())))
} else {
Ok(None)
}
}
None => Err(RocflError::IllegalState(format!(
"Failed to find digest for {}",
src_path
))),
}
}