use crate::constants::OXEN_HIDDEN_DIR;
use crate::core::db;
use crate::error::OxenError;
use crate::model::LocalRepository;
use crate::model::staged_data::StagedDataOpts;
use crate::opts::RmOpts;
use crate::repositories;
use crate::util;
use crate::core::v_latest::index::CommitMerkleTree;
use crate::model::merkle_tree::node::FileNode;
use crate::view::ErrorFileInfo;
use indicatif::ProgressBar;
use indicatif::ProgressStyle;
use rocksdb::IteratorMode;
use tokio::time::Duration;
use crate::core::staged::with_staged_db_manager;
use crate::core::v_latest::add::CumulativeStats;
use crate::core::v_latest::add::add_file_node_and_parent_dir;
use crate::model::merkle_tree::node::EMerkleTreeNode;
use crate::model::merkle_tree::node::MerkleTreeNode;
use crate::model::merkle_tree::node::StagedMerkleTreeNode;
use crate::constants::STAGED_DIR;
use crate::model::Commit;
use crate::model::StagedEntryStatus;
use parking_lot::Mutex;
use rmp_serde::Serializer;
use serde::Serialize;
use std::collections::HashMap;
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::str;
use rocksdb::{DBWithThreadMode, MultiThreaded};
use std::sync::Arc;
pub fn rm(
paths: &HashSet<PathBuf>,
repo: &LocalRepository,
opts: &RmOpts,
) -> Result<(), OxenError> {
let db_opts = db::key_val::opts::default();
let db_path = util::fs::oxen_hidden_dir(&repo.path).join(STAGED_DIR);
let staged_db: DBWithThreadMode<MultiThreaded> =
DBWithThreadMode::open(&db_opts, dunce::simplified(&db_path))?;
rm_with_staged_db(paths, repo, opts, &staged_db)
}
pub fn rm_with_staged_db(
paths: &HashSet<PathBuf>,
repo: &LocalRepository,
opts: &RmOpts,
staged_db: &DBWithThreadMode<MultiThreaded>,
) -> Result<(), OxenError> {
if has_modified_files(repo, paths)? {
let error = "There are modified files in the working directory.\n\tUse `oxen status` to see the modified files.".to_string();
return Err(OxenError::basic_str(error));
}
if opts.staged && opts.recursive {
return remove_staged_recursively_inner(repo, paths, staged_db);
} else if opts.staged {
return remove_staged_inner(repo, paths, opts, staged_db);
}
remove_inner(paths, repo, opts, staged_db)?;
Ok(())
}
fn remove_staged_recursively_inner(
repo: &LocalRepository,
paths: &HashSet<PathBuf>,
staged_db: &DBWithThreadMode<MultiThreaded>,
) -> Result<(), OxenError> {
let iter = staged_db.iterator(IteratorMode::Start);
for item in iter {
match item {
Ok((key, _)) => match str::from_utf8(&key) {
Ok(key) => {
log::debug!("considering key: {key:?}");
for path in paths {
let path = util::fs::path_relative_to_dir(path, &repo.path)?;
let db_path = PathBuf::from(key);
log::debug!("considering rm db_path: {db_path:?} for path: {path:?}");
if db_path.starts_with(&path) && path != Path::new("") {
let mut parent = db_path.parent().unwrap_or(Path::new(""));
remove_staged_entry(&db_path, staged_db)?;
while parent != Path::new("") {
log::debug!("maybe cleaning up empty dir: {parent:?}");
cleanup_empty_dirs(parent, staged_db)?;
parent = parent.parent().unwrap_or(Path::new(""));
if parent == Path::new("") {
cleanup_empty_dirs(parent, staged_db)?;
}
}
}
}
}
_ => {
return Err(OxenError::basic_str("Could not read utf8 val..."));
}
},
_ => {
return Err(OxenError::basic_str(
"Could not read iterate over db values",
));
}
}
}
Ok(())
}
fn has_modified_files(repo: &LocalRepository, paths: &HashSet<PathBuf>) -> Result<bool, OxenError> {
let modified = list_modified_files(repo, paths)?;
Ok(!modified.is_empty())
}
fn list_modified_files(
repo: &LocalRepository,
paths: &HashSet<PathBuf>,
) -> Result<Vec<PathBuf>, OxenError> {
let paths_vec: Vec<PathBuf> = paths.iter().map(|p| repo.path.join(p)).collect();
let opts = StagedDataOpts::from_paths(&paths_vec);
let status = repositories::status::status_from_opts(repo, &opts)?;
log::debug!("status modified_files: {:?}", status.modified_files);
log::debug!("paths: {paths:?}");
let modified: Vec<PathBuf> = status
.modified_files
.into_iter()
.filter(|path| {
paths.contains(path.parent().unwrap_or(Path::new(""))) || paths.contains(path)
})
.collect();
Ok(modified)
}
fn cleanup_empty_dirs(
path: &Path,
staged_db: &DBWithThreadMode<MultiThreaded>,
) -> Result<(), OxenError> {
let iter = staged_db.iterator(IteratorMode::Start);
let mut total = 0;
for item in iter {
match item {
Ok((key, _)) => match str::from_utf8(&key) {
Ok(key) => {
log::debug!("considering key: {key:?}");
let db_path = PathBuf::from(key);
if db_path.starts_with(path) && path != db_path {
total += 1;
}
}
_ => {
return Err(OxenError::basic_str(
"Could not read iterate over db values",
));
}
},
_ => {
return Err(OxenError::basic_str(
"Could not read iterate over db values",
));
}
}
}
log::debug!("total sub paths for dir {path:?}: {total}");
if total == 0 {
log::debug!("removing empty dir: {path:?}");
staged_db.delete(path.to_str().unwrap())?;
}
Ok(())
}
fn remove_staged_inner(
repo: &LocalRepository,
paths: &HashSet<PathBuf>,
rm_opts: &RmOpts,
staged_db: &DBWithThreadMode<MultiThreaded>,
) -> Result<(), OxenError> {
log::debug!("remove_staged paths {paths:?}");
for path in paths {
let relative_path = util::fs::path_relative_to_dir(path, &repo.path)?;
let Some(entry) = get_staged_entry(&relative_path, staged_db)? else {
continue;
};
if entry.node.is_dir() && !rm_opts.recursive {
let error = format!("`oxen rm` on directory {path:?} requires -r");
return Err(OxenError::basic_str(error));
}
remove_staged_entry(&relative_path, staged_db)?;
}
Ok(())
}
pub fn remove_staged(
repo: &LocalRepository,
paths: &HashSet<PathBuf>,
rm_opts: &RmOpts,
) -> Result<(), OxenError> {
let opts = db::key_val::opts::default();
let db_path = util::fs::oxen_hidden_dir(&repo.path).join(STAGED_DIR);
let staged_db: DBWithThreadMode<MultiThreaded> =
DBWithThreadMode::open(&opts, dunce::simplified(&db_path))?;
remove_staged_inner(repo, paths, rm_opts, &staged_db)
}
fn get_staged_entry(
path: &Path,
staged_db: &DBWithThreadMode<MultiThreaded>,
) -> Result<Option<StagedMerkleTreeNode>, OxenError> {
let path_str = path.to_str().unwrap();
let Some(value) = staged_db.get(path_str)? else {
return Ok(None);
};
Ok(Some(rmp_serde::from_slice(&value)?))
}
fn remove_staged_entry(
path: &Path,
staged_db: &DBWithThreadMode<MultiThreaded>,
) -> Result<(), OxenError> {
log::debug!("remove_staged path: {path:?} from staged db {staged_db:?}");
staged_db.delete(path.to_str().unwrap())?;
Ok(())
}
fn remove_file_inner(
repo: &LocalRepository,
path: &Path,
file_node: &FileNode,
staged_db: &DBWithThreadMode<MultiThreaded>,
) -> Result<CumulativeStats, OxenError> {
let path = util::fs::path_relative_to_dir(path, &repo.path)?;
log::debug!("remove_file path is {path:?}");
let mut total = CumulativeStats {
total_files: 0,
total_bytes: 0,
data_type_counts: HashMap::new(),
};
match process_remove_file_and_parents(repo, &path, staged_db, file_node) {
Ok(Some(node)) => {
if let EMerkleTreeNode::File(file_node) = &node.node.node {
total.total_bytes += file_node.num_bytes();
total.total_files += 1;
total
.data_type_counts
.entry(file_node.data_type().clone())
.and_modify(|count| *count += 1)
.or_insert(1);
}
Ok(total)
}
Err(e) => {
let error = format!("Error adding file {path:?}: {e:?}");
Err(OxenError::basic_str(error))
}
_ => {
let error = format!("Error adding file {path:?}");
Err(OxenError::basic_str(error))
}
}
}
pub fn remove_file(
repo: &LocalRepository,
path: &Path,
file_node: &FileNode,
) -> Result<CumulativeStats, OxenError> {
let opts = db::key_val::opts::default();
let db_path = util::fs::oxen_hidden_dir(&repo.path).join(STAGED_DIR);
let staged_db: DBWithThreadMode<MultiThreaded> =
DBWithThreadMode::open(&opts, dunce::simplified(&db_path))?;
remove_file_inner(repo, path, file_node, &staged_db)
}
pub fn remove_file_with_db_manager(
repo: &LocalRepository,
path: &Path,
file_node: &FileNode,
seen_dirs: &Arc<Mutex<HashSet<PathBuf>>>,
) -> Result<Vec<ErrorFileInfo>, OxenError> {
let mut err_files: Vec<ErrorFileInfo> = vec![];
let _ = with_staged_db_manager(repo, |staged_db_manager| {
let status = StagedEntryStatus::Removed;
match add_file_node_and_parent_dir(file_node, status, path, staged_db_manager, seen_dirs) {
Ok(_) => Ok(()),
Err(e) => {
err_files.push(ErrorFileInfo {
hash: file_node.hash().to_string(),
path: Some(path.to_path_buf()),
error: format!("Failed to add file to staged db: {e}"),
});
Err(e)
}
}
});
Ok(err_files)
}
pub fn remove_dir_with_db_manager(
repo: &LocalRepository,
root_dir: &MerkleTreeNode,
root_path: &Path,
seen_dirs: &Arc<Mutex<HashSet<PathBuf>>>,
) -> Result<(), OxenError> {
let empty_path = PathBuf::new();
let mut staged_nodes: HashMap<PathBuf, StagedMerkleTreeNode> = HashMap::new();
with_staged_db_manager(repo, |staged_db_manager| {
let nodes = root_dir.list_files_and_dirs()?;
let parent_path = root_path.parent().unwrap_or(&empty_path);
for (path, node) in nodes {
let path = parent_path.join(path);
let corrected_node = match &node.node {
EMerkleTreeNode::File(file_node) => {
let mut file_node = file_node.clone();
file_node.set_name(&path.to_string_lossy());
MerkleTreeNode {
hash: node.hash,
node: EMerkleTreeNode::File(file_node.clone()),
parent_id: node.parent_id,
children: node.children.clone(),
}
}
EMerkleTreeNode::Directory(dir_node) => {
let mut dir_node = dir_node.clone();
dir_node.set_name(path.to_string_lossy());
MerkleTreeNode {
hash: node.hash,
node: EMerkleTreeNode::Directory(dir_node.clone()),
parent_id: node.parent_id,
children: node.children.clone(),
}
}
_ => {
return Err(OxenError::basic_str("Error: Unexpected node type"));
}
};
let staged_node = StagedMerkleTreeNode {
status: StagedEntryStatus::Removed,
node: corrected_node,
};
staged_nodes.insert(path, staged_node);
}
log::debug!("staged_nodes: {}", staged_nodes.len());
let mut parent_path = root_path.to_path_buf();
while let Some(parent) = parent_path.parent() {
parent_path = parent.to_path_buf();
match staged_db_manager.add_directory(&parent_path, seen_dirs) {
Ok(_) => {}
Err(e) => {
log::debug!("Error adding parent dirs: {e:?}");
return Err(e);
}
}
if parent_path == Path::new("") {
break;
}
}
match staged_db_manager.upsert_staged_nodes(&staged_nodes) {
Ok(_) => {
log::debug!("Successfully upserted staged nodes");
Ok(())
}
Err(e) => {
log::error!("Failed to upsert staged nodes due to error: {e:?}");
Err(e)
}
}
})
}
fn process_remove_file_and_parents(
repo: &LocalRepository,
path: &Path,
staged_db: &DBWithThreadMode<MultiThreaded>,
file_node: &FileNode,
) -> Result<Option<StagedMerkleTreeNode>, OxenError> {
let repo_path = repo.path.clone();
let mut update_node = file_node.clone();
update_node.set_name(path.to_string_lossy().to_string().as_str());
log::debug!("Update node is: {update_node:?}");
let node = MerkleTreeNode::from_file(update_node);
let staged_entry = StagedMerkleTreeNode {
status: StagedEntryStatus::Removed,
node,
};
log::debug!("Staged entry is: {staged_entry}");
log::debug!("writing removed file to staged db: {staged_entry}");
let mut buf = Vec::new();
staged_entry
.serialize(&mut Serializer::new(&mut buf))
.unwrap();
let node_path = path.to_str().unwrap();
staged_db.put(node_path, &buf).unwrap();
let mut parent_path = path.to_path_buf();
while let Some(parent) = parent_path.parent() {
let relative_path = util::fs::path_relative_to_dir(parent, repo_path.clone())?;
parent_path = parent.to_path_buf();
let relative_path_str = relative_path.to_str().unwrap();
let dir_entry = StagedMerkleTreeNode {
status: StagedEntryStatus::Modified,
node: MerkleTreeNode::default_dir_from_path(&relative_path),
};
log::debug!("writing dir to staged db: {dir_entry}");
let mut buf = Vec::new();
dir_entry.serialize(&mut Serializer::new(&mut buf)).unwrap();
staged_db.put(relative_path_str, &buf).unwrap();
if relative_path == Path::new("") {
break;
}
}
Ok(Some(staged_entry))
}
fn remove_inner(
paths: &HashSet<PathBuf>,
repo: &LocalRepository,
opts: &RmOpts,
staged_db: &DBWithThreadMode<MultiThreaded>,
) -> Result<CumulativeStats, OxenError> {
let start = std::time::Instant::now();
let Some(head_commit) = repositories::commits::head_commit_maybe(repo)? else {
let error = "Error: head commit not found".to_string();
return Err(OxenError::basic_str(error));
};
let mut total = CumulativeStats {
total_files: 0,
total_bytes: 0,
data_type_counts: HashMap::new(),
};
for path in paths {
let path = util::fs::path_relative_to_dir(path, &repo.path)?;
let parent_path = path.parent().unwrap_or(Path::new(""));
let parent_node: MerkleTreeNode = if let Some(dir_node) =
CommitMerkleTree::dir_with_children(repo, &head_commit, parent_path, None)?
{
dir_node
} else {
let error = format!("Error: parent dir not found in tree for {path:?}");
return Err(OxenError::basic_str(error));
};
let relative_path = util::fs::path_relative_to_dir(path.clone(), parent_path)?;
if let Some(node) = parent_node.get_by_path(relative_path.clone())? {
if let EMerkleTreeNode::Directory(_) = &node.node {
if !opts.recursive {
let error = format!("`oxen rm` on directory {path:?} requires -r");
return Err(OxenError::basic_str(error));
}
total += remove_dir_inner(repo, &head_commit, &path, staged_db)?;
let full_path = repo.path.join(path);
log::debug!("Removing directory: {full_path:?}");
if full_path.exists() {
if full_path != repo.path && full_path != repo.path.join(OXEN_HIDDEN_DIR) {
util::fs::remove_dir_all(&full_path)?;
log::debug!("Successfully removed directory from filesystem");
}
}
} else if let EMerkleTreeNode::File(file_node) = &node.node {
total += remove_file_inner(repo, &path, file_node, staged_db)?;
let full_path = repo.path.join(path);
log::debug!("Removing file: {full_path:?}");
if full_path.exists() {
util::fs::remove_file(&full_path)?;
}
} else {
let error = "Error: Unexpected file type".to_string();
return Err(OxenError::basic_str(error));
}
} else {
let error = format!("Error: {path:?} must be committed in order to use `oxen rm`");
return Err(OxenError::basic_str(error));
}
}
let duration = Duration::from_millis(start.elapsed().as_millis() as u64);
log::debug!("---END--- oxen rm: {paths:?} duration: {duration:?}");
println!(
"🐂 oxen removed {} files ({}) in {}",
total.total_files,
bytesize::ByteSize::b(total.total_bytes),
humantime::format_duration(duration)
);
Ok(total)
}
pub fn process_remove_file(
path: &Path,
file_node: &FileNode,
staged_db: &DBWithThreadMode<MultiThreaded>,
) -> Result<Option<StagedMerkleTreeNode>, OxenError> {
let mut update_node = file_node.clone();
update_node.set_name(&path.to_string_lossy());
let node = MerkleTreeNode::from_file(update_node);
let staged_entry = StagedMerkleTreeNode {
status: StagedEntryStatus::Removed,
node,
};
log::debug!("writing removed file to staged db: {staged_entry}");
let mut buf = Vec::new();
staged_entry
.serialize(&mut Serializer::new(&mut buf))
.unwrap();
let relative_path_str = path.to_str().unwrap();
staged_db.put(relative_path_str, &buf).unwrap();
Ok(Some(staged_entry))
}
fn remove_dir_inner(
repo: &LocalRepository,
commit: &Commit,
path: &Path,
staged_db: &DBWithThreadMode<MultiThreaded>,
) -> Result<CumulativeStats, OxenError> {
let dir_node = match CommitMerkleTree::dir_with_children_recursive(repo, commit, path, None)? {
Some(node) => node,
None => {
let error = format!("Error: {path:?} must be committed in order to use `oxen rm`");
return Err(OxenError::basic_str(error));
}
};
process_remove_dir(repo, path, &dir_node, staged_db)
}
pub fn remove_dir(
repo: &LocalRepository,
commit: &Commit,
path: &Path,
) -> Result<CumulativeStats, OxenError> {
let opts = db::key_val::opts::default();
let db_path = util::fs::oxen_hidden_dir(&repo.path).join(STAGED_DIR);
let staged_db: DBWithThreadMode<MultiThreaded> =
DBWithThreadMode::open(&opts, dunce::simplified(&db_path))?;
remove_dir_inner(repo, commit, path, &staged_db)
}
fn process_remove_dir(
repo: &LocalRepository,
path: &Path,
dir_node: &MerkleTreeNode,
staged_db: &DBWithThreadMode<MultiThreaded>,
) -> Result<CumulativeStats, OxenError> {
log::debug!("Process Remove Dir");
let progress_1 = Arc::new(ProgressBar::new_spinner());
progress_1.set_style(ProgressStyle::default_spinner());
progress_1.enable_steady_tick(Duration::from_millis(100));
let repo = repo.clone();
let repo_path = repo.path.clone();
let progress_1_clone = Arc::clone(&progress_1);
log::debug!("Begin r_process_remove_dir");
let cumulative_stats = r_process_remove_dir(&repo, path, dir_node, staged_db);
let mut parent_path = path.to_path_buf();
while let Some(parent) = parent_path.parent() {
let relative_path = util::fs::path_relative_to_dir(parent, repo_path.clone())?;
parent_path = parent.to_path_buf();
let Some(relative_path_str) = relative_path.to_str() else {
let error = format!("Error: {relative_path:?} is not a valid string");
return Err(OxenError::basic_str(error));
};
let dir_entry = StagedMerkleTreeNode {
status: StagedEntryStatus::Modified,
node: MerkleTreeNode::default_dir_from_path(&relative_path),
};
log::debug!("writing dir to staged db: {dir_entry}");
let mut buf = Vec::new();
dir_entry.serialize(&mut Serializer::new(&mut buf)).unwrap();
staged_db.put(relative_path_str, &buf).unwrap();
if relative_path == Path::new("") {
break;
}
}
progress_1_clone.finish_and_clear();
cumulative_stats
}
fn r_process_remove_dir(
_repo: &LocalRepository,
path: &Path,
node: &MerkleTreeNode,
staged_db: &DBWithThreadMode<MultiThreaded>,
) -> Result<CumulativeStats, OxenError> {
let mut total = CumulativeStats {
total_files: 0,
total_bytes: 0,
data_type_counts: HashMap::new(),
};
for child in &node.children {
match &child.node {
EMerkleTreeNode::Directory(dir_node) => {
log::debug!("Recursive process_remove_dir found dir: {dir_node}");
let new_path = path.join(dir_node.name());
total += r_process_remove_dir(_repo, &new_path, child, staged_db)?;
}
EMerkleTreeNode::VNode(_) => {
log::debug!("Recursive process_remove_dir found vnode");
total += r_process_remove_dir(_repo, path, child, staged_db)?;
}
EMerkleTreeNode::File(file_node) => {
log::debug!("Recursive process_remove_dir found file: {file_node}");
let new_path = path.join(file_node.name());
match process_remove_file(&new_path, file_node, staged_db) {
Ok(Some(node)) => {
if let EMerkleTreeNode::File(file_node) = &node.node.node {
total.total_bytes += file_node.num_bytes();
total.total_files += 1;
total
.data_type_counts
.entry(file_node.data_type().clone())
.and_modify(|count| *count += 1)
.or_insert(1);
}
}
Err(e) => {
let error = format!("Error adding file {new_path:?}: {e:?}");
return Err(OxenError::basic_str(error));
}
_ => {
let error = format!("Error adding file {new_path:?}");
return Err(OxenError::basic_str(error));
}
}
}
_ => {
let error = "Error: Unexpected node type".to_string();
return Err(OxenError::basic_str(error));
}
}
}
match &node.node {
EMerkleTreeNode::Directory(_) => {
let staged_entry = StagedMerkleTreeNode {
status: StagedEntryStatus::Removed,
node: node.clone(),
};
log::debug!("writing removed dir to staged db: {staged_entry}");
let mut buf = Vec::new();
staged_entry
.serialize(&mut Serializer::new(&mut buf))
.unwrap();
let relative_path_str = path.to_str().unwrap();
staged_db.put(relative_path_str, &buf).unwrap();
}
EMerkleTreeNode::VNode(_) => {}
_ => {
return Err(OxenError::basic_str(format!(
"Unexpected node type: {:?}",
node.node.node_type()
)));
}
}
Ok(total)
}