use bytes::BytesMut;
use futures::StreamExt;
use parking_lot::Mutex;
use reqwest::Client;
use reqwest::header::HeaderValue;
use reqwest::redirect;
use std::collections::HashSet;
use std::fs::File;
use std::io::{Read, Write};
use std::net::IpAddr;
use std::path::{Component, Path, PathBuf};
use std::sync::Arc;
use url::Url;
use zip::ZipArchive;
use crate::core;
use crate::core::staged::staged_db_manager::with_staged_db_manager;
use crate::core::v_latest::add::{
add_file_node_to_staged_db, get_file_node, process_add_file_with_staged_db_manager,
stage_file_with_hash,
};
use crate::error::OxenError;
use crate::model::file::TempFilePathNew;
use crate::model::merkle_tree::node::EMerkleTreeNode;
use crate::model::merkle_tree::node::MerkleTreeNode;
use crate::model::user::User;
use crate::model::workspace::Workspace;
use crate::model::{Branch, Commit, StagedEntryStatus};
use crate::model::{LocalRepository, NewCommitBody};
use crate::repositories;
use crate::util;
use crate::view::{ErrorFileInfo, FileWithHash};
const BUFFER_SIZE_THRESHOLD: usize = 262144; const MAX_CONTENT_LENGTH: u64 = 1024 * 1024 * 1024; const MAX_DECOMPRESSED_SIZE: u64 = 1024 * 1024 * 1024; const MAX_COMPRESSION_RATIO: u64 = 100;
pub async fn add(workspace: &Workspace, filepath: impl AsRef<Path>) -> Result<PathBuf, OxenError> {
let filepath = filepath.as_ref();
let workspace_repo = &workspace.workspace_repo;
let base_repo = &workspace.base_repo;
let commit = workspace.commit.clone();
p_add_file(base_repo, workspace_repo, &Some(commit), filepath).await?;
let relative_path = util::fs::path_relative_to_dir(filepath, &workspace_repo.path)?;
Ok(relative_path)
}
pub async fn rm(
workspace: &Workspace,
filepath: impl AsRef<Path>,
) -> Result<Vec<ErrorFileInfo>, OxenError> {
let filepath = filepath.as_ref();
let workspace_repo = &workspace.workspace_repo;
let base_repo = &workspace.base_repo;
let err_files = p_rm(base_repo, workspace_repo, &workspace.commit, filepath).await?;
Ok(err_files)
}
pub fn add_version_file(
workspace: &Workspace,
version_path: impl AsRef<Path>,
dst_path: impl AsRef<Path>,
file_hash: &str,
) -> Result<PathBuf, OxenError> {
let dst_path = dst_path.as_ref();
with_staged_db_manager(&workspace.workspace_repo, |staged_db_manager| {
stage_file_with_hash(
workspace,
version_path.as_ref(),
dst_path,
file_hash,
staged_db_manager,
&Arc::new(Mutex::new(HashSet::new())),
)
})?;
Ok(dst_path.to_path_buf())
}
pub async fn add_version_files(
repo: &LocalRepository,
workspace: &Workspace,
files_with_hash: &[FileWithHash],
directory: impl AsRef<str>,
) -> Result<Vec<ErrorFileInfo>, OxenError> {
let version_store = repo.version_store()?;
let directory = directory.as_ref();
let workspace_repo = &workspace.workspace_repo;
let seen_dirs = Arc::new(Mutex::new(HashSet::new()));
let mut version_paths = Vec::with_capacity(files_with_hash.len());
for item in files_with_hash.iter() {
version_paths.push(version_store.get_version_path(&item.hash).await?);
}
let mut err_files: Vec<ErrorFileInfo> = vec![];
with_staged_db_manager(workspace_repo, |staged_db_manager| {
for (item, version_path) in files_with_hash.iter().zip(version_paths.iter()) {
let target_path = PathBuf::from(directory).join(&item.path);
match stage_file_with_hash(
workspace,
version_path,
&target_path,
&item.hash,
staged_db_manager,
&seen_dirs,
) {
Ok(_) => {
}
Err(e) => {
log::error!("error with adding file: {e:?}");
err_files.push(ErrorFileInfo {
hash: item.hash.clone(),
path: Some(item.path.clone()),
error: format!("Failed to add file to staged db: {e}"),
});
continue;
}
}
}
log::debug!(
"add_version_files complete with {:?} err_files",
err_files.len()
);
Ok(err_files)
})
}
pub fn track_modified_data_frame(
workspace: &Workspace,
filepath: impl AsRef<Path>,
) -> Result<PathBuf, OxenError> {
let filepath = filepath.as_ref();
let workspace_repo = &workspace.workspace_repo;
let base_repo = &workspace.base_repo;
let commit = workspace.commit.clone();
p_modify_file(base_repo, workspace_repo, &Some(commit), filepath)?;
let relative_path = util::fs::path_relative_to_dir(filepath, &workspace_repo.path)?;
Ok(relative_path)
}
pub async fn remove_files_from_staged_db(
workspace: &Workspace,
paths: Vec<PathBuf>,
) -> Result<Vec<PathBuf>, OxenError> {
let mut err_files = vec![];
for path in paths {
match unstage(workspace, &path) {
Ok(_) => {}
Err(e) => {
log::debug!("Error removing file path {path:?}: {e:?}");
err_files.push(path);
}
}
}
Ok(err_files)
}
pub fn unstage(workspace: &Workspace, path: impl AsRef<Path>) -> Result<(), OxenError> {
let workspace_repo = &workspace.workspace_repo;
let path = util::fs::path_relative_to_dir(path.as_ref(), &workspace_repo.path)?;
with_staged_db_manager(workspace_repo, |staged_db_manager| {
staged_db_manager.delete_entry(&path)
})
}
pub fn exists(workspace: &Workspace, path: impl AsRef<Path>) -> Result<bool, OxenError> {
let workspace_repo = &workspace.workspace_repo;
let path = util::fs::path_relative_to_dir(path.as_ref(), &workspace_repo.path)?;
with_staged_db_manager(workspace_repo, |staged_db_manager| {
staged_db_manager.exists(&path)
})
}
fn is_private_ip(ip: &IpAddr) -> bool {
match ip {
IpAddr::V4(v4) => {
v4.is_loopback() || v4.is_private() || v4.is_link_local() || v4.is_unspecified() || v4.is_broadcast() || is_cgn_or_reserved_v4(v4.octets())
}
IpAddr::V6(v6) => {
if v6.is_loopback() || v6.is_unspecified() {
return true;
}
let segments = v6.segments();
if segments[0] & 0xfe00 == 0xfc00 {
return true;
}
if segments[0] & 0xffc0 == 0xfe80 {
return true;
}
if segments[0] == 0x2001 && segments[1] == 0x0db8 {
return true;
}
if segments[0] == 0x0064 && segments[1] == 0xff9b {
let v4 = std::net::Ipv4Addr::new(
(segments[6] >> 8) as u8,
segments[6] as u8,
(segments[7] >> 8) as u8,
segments[7] as u8,
);
return is_private_ip(&IpAddr::V4(v4));
}
if let Some(v4) = v6.to_ipv4_mapped() {
return is_private_ip(&IpAddr::V4(v4));
}
if let Some(v4) = v6.to_ipv4() {
return is_private_ip(&IpAddr::V4(v4));
}
false
}
}
}
fn is_cgn_or_reserved_v4(octets: [u8; 4]) -> bool {
if octets[0] == 100 && (octets[1] & 0xC0) == 64 {
return true;
}
if octets[0] == 192 && octets[1] == 0 && octets[2] == 0 {
return true;
}
if octets[0] == 198 && (octets[1] & 0xFE) == 18 {
return true;
}
false
}
async fn validate_url_target(url: &Url) -> Result<(), OxenError> {
let host = url
.host_str()
.ok_or_else(|| OxenError::file_import_error("URL has no host"))?;
let port = url.port_or_known_default().unwrap_or(443);
let addr = format!("{host}:{port}");
let resolved = tokio::net::lookup_host(&addr).await.map_err(|e| {
OxenError::file_import_error(format!("DNS resolution failed for {host}: {e}"))
})?;
for socket_addr in resolved {
if is_private_ip(&socket_addr.ip()) {
return Err(OxenError::file_import_error(format!(
"URL resolves to a private/reserved IP address: {}",
socket_addr.ip()
)));
}
}
Ok(())
}
fn parse_content_disposition_filename(header: &str) -> Option<String> {
let lower = header.to_lowercase();
if let Some(pos) = lower.find("filename=") {
let rest = &header[pos + 9..];
if let Some(rest) = rest.strip_prefix('"') {
rest.find('"').map(|end| rest[..end].to_string())
} else {
let end = rest.find(';').unwrap_or(rest.len());
let name = rest[..end].trim();
if name.is_empty() {
None
} else {
Some(name.to_string())
}
}
} else {
None
}
}
fn filename_from_url(url: &Url) -> Option<String> {
url.path_segments()
.and_then(|mut segments| segments.next_back())
.filter(|s| !s.is_empty())
.and_then(|s| urlencoding::decode(s).ok())
.map(|s| s.into_owned())
}
fn sanitize_filename(name: &str) -> String {
name.chars()
.map(|c| if c.is_whitespace() { '_' } else { c })
.filter(|&c| c.is_alphanumeric() || c == '.' || c == '-' || c == '_')
.collect()
}
pub async fn import(
url: &str,
auth: &str,
directory: PathBuf,
filename: Option<String>,
workspace: &Workspace,
) -> Result<(), OxenError> {
let parsed_url =
Url::parse(url).map_err(|_| OxenError::file_import_error(format!("Invalid URL: {url}")))?;
let scheme = parsed_url.scheme();
if scheme != "http" && scheme != "https" {
return Err(OxenError::file_import_error(
"Only http and https URLs are allowed",
));
}
validate_url_target(&parsed_url).await?;
let auth_header_value = HeaderValue::from_str(auth)
.map_err(|_e| OxenError::file_import_error(format!("Invalid header auth value {auth}")))?;
fetch_file(
&parsed_url,
auth_header_value,
directory,
filename,
workspace,
)
.await?;
Ok(())
}
pub async fn upload_zip(
commit_message: &str,
user: &User,
temp_files: Vec<TempFilePathNew>,
workspace: &Workspace,
branch: &Branch,
) -> Result<Commit, OxenError> {
for temp_file in temp_files {
let files = decompress_zip(&temp_file.temp_file_path)?;
for file in files.iter() {
if file
.components()
.any(|component| component.as_os_str().to_string_lossy() == "__MACOSX")
{
log::debug!("Skipping __MACOSX file: {file:?}");
continue;
}
repositories::workspaces::files::add(workspace, file).await?;
}
}
let data = NewCommitBody {
message: commit_message.to_string(),
author: user.name.clone(),
email: user.email.clone(),
};
let res = repositories::workspaces::commit(workspace, &data, &branch.name).await;
match res {
Ok(commit) => {
log::debug!("workspace::commit ✅ success! commit {commit:?}");
Ok(commit)
}
Err(OxenError::WorkspaceBehind(workspace)) => {
log::error!(
"unable to commit branch {:?}. Workspace behind",
branch.name
);
Err(OxenError::WorkspaceBehind(workspace))
}
Err(err) => {
log::error!("unable to commit branch {:?}. Err: {}", branch.name, err);
Err(err)
}
}
}
const MAX_REDIRECTS: usize = 10;
async fn fetch_file(
url: &Url,
auth_header_value: HeaderValue,
directory: PathBuf,
caller_filename: Option<String>,
workspace: &Workspace,
) -> Result<(), OxenError> {
let client = Client::builder()
.redirect(redirect::Policy::none())
.build()
.map_err(|e| OxenError::file_import_error(format!("Failed to build HTTP client: {e}")))?;
let mut current_url = url.clone();
let mut response = None;
for hop in 0..=MAX_REDIRECTS {
let mut req = client.get(current_url.as_str());
if hop == 0 {
req = req.header("Authorization", auth_header_value.clone());
}
let resp = req
.send()
.await
.map_err(|e| OxenError::file_import_error(format!("Fetch file request failed: {e}")))?;
let status = resp.status();
if status.is_redirection() {
if hop == MAX_REDIRECTS {
return Err(OxenError::file_import_error("Too many redirects (max 10)"));
}
let location = resp
.headers()
.get("location")
.and_then(|v| v.to_str().ok())
.ok_or_else(|| {
OxenError::file_import_error("Redirect response missing Location header")
})?;
let next_url = current_url
.join(location)
.map_err(|e| OxenError::file_import_error(format!("Invalid redirect URL: {e}")))?;
let scheme = next_url.scheme();
if scheme != "http" && scheme != "https" {
return Err(OxenError::file_import_error(
"Redirect to non-HTTP(S) URL is not allowed",
));
}
validate_url_target(&next_url).await?;
current_url = next_url;
continue;
}
if !status.is_success() {
return Err(OxenError::file_import_error(format!(
"HTTP request failed with status {status}"
)));
}
response = Some(resp);
break;
}
let response = response
.ok_or_else(|| OxenError::file_import_error("Failed to get a successful response"))?;
let resp_headers = response.headers();
let content_type = resp_headers
.get("content-type")
.and_then(|h| h.to_str().ok())
.unwrap_or("application/octet-stream");
let content_length = response.content_length();
if let Some(content_length) = content_length
&& content_length > MAX_CONTENT_LENGTH
{
return Err(OxenError::file_import_error(format!(
"Content length {content_length} exceeds maximum allowed size of 1GB"
)));
}
let raw_filename = caller_filename.clone().unwrap_or_else(|| {
resp_headers
.get("content-disposition")
.and_then(|h| h.to_str().ok())
.and_then(parse_content_disposition_filename) .or_else(|| filename_from_url(¤t_url)) .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()) });
let filename = sanitize_filename(&raw_filename);
if filename.is_empty() {
return Err(OxenError::file_import_error(format!(
"Could not determine a valid filename for {url}"
)));
}
let is_zip = content_type.contains("zip");
log::debug!("files::import_file Got filename : {filename:?}");
let filepath = directory.join(&filename);
log::debug!("files::import_file got download filepath: {filepath:?}");
let mut stream = response.bytes_stream();
let mut buffer = BytesMut::new();
let mut save_path = PathBuf::new();
let mut bytes_downloaded: u64 = 0;
while let Some(chunk) = stream.next().await {
let chunk = chunk.map_err(|_| OxenError::file_import_error("Error reading file stream"))?;
let processed_chunk = chunk.to_vec();
buffer.extend_from_slice(&processed_chunk);
bytes_downloaded += processed_chunk.len() as u64;
if bytes_downloaded > MAX_CONTENT_LENGTH {
delete_file(workspace, &filepath)?;
return Err(OxenError::file_import_error(
"Content length exceeds maximum allowed size of 1GB",
));
}
if buffer.len() > BUFFER_SIZE_THRESHOLD {
save_path = save_stream(workspace, &filepath, buffer.split().freeze().to_vec())
.await
.map_err(|e| {
OxenError::file_import_error(format!(
"Error occurred when saving file stream: {e}"
))
})?;
}
}
if !buffer.is_empty() {
save_path = save_stream(workspace, &filepath, buffer.freeze().to_vec())
.await
.map_err(|e| {
OxenError::file_import_error(format!("Error occurred when saving file stream: {e}"))
})?;
}
log::debug!("workspace::files::import_file save_path is {save_path:?}");
if let Some(content_length) = content_length {
let bytes_written = if save_path.exists() {
util::fs::metadata(&save_path)?.len()
} else {
0
};
log::debug!(
"workspace::files::import_file has written {bytes_written:?} bytes. It's expecting {content_length:?} bytes"
);
if bytes_written != content_length {
return Err(OxenError::file_import_error(
"Content length does not match. File incomplete.",
));
}
}
if is_zip {
let files = decompress_zip(&save_path)?;
log::debug!("workspace::files::import_file unzipped file");
for file in files.iter() {
log::debug!("file::import add file {file:?}");
let path = repositories::workspaces::files::add(workspace, file).await?;
log::debug!("file::import add file ✅ success! staged file {path:?}");
}
} else {
log::debug!("file::import add file {:?}", &filepath);
let path = repositories::workspaces::files::add(workspace, &save_path).await?;
log::debug!("file::import add file ✅ success! staged file {path:?}");
}
Ok(())
}
fn delete_file(workspace: &Workspace, path: impl AsRef<Path>) -> Result<(), OxenError> {
let path = path.as_ref();
let workspace_repo = &workspace.workspace_repo;
let relative_path = util::fs::path_relative_to_dir(path, &workspace_repo.path)?;
let full_path = workspace_repo.path.join(&relative_path);
if full_path.exists() {
std::fs::remove_file(&full_path).map_err(|e| {
OxenError::file_import_error(format!(
"Failed to remove file {}: {}",
full_path.display(),
e
))
})?;
}
Ok(())
}
pub async fn save_stream(
workspace: &Workspace,
filepath: &PathBuf,
chunk: Vec<u8>,
) -> Result<PathBuf, OxenError> {
log::debug!(
"liboxen::workspace::files::save_stream writing {} bytes to file",
chunk.len()
);
let workspace_dir = workspace.dir();
log::debug!("liboxen::workspace::files::save_stream Got workspace dir: {workspace_dir:?}");
let full_dir = workspace_dir.join(filepath);
log::debug!("liboxen::workspace::files::save_stream Got full dir: {full_dir:?}");
if let Some(parent) = full_dir.parent() {
std::fs::create_dir_all(parent)?;
}
log::debug!(
"liboxen::workspace::files::save_stream successfully created full dir: {full_dir:?}"
);
let full_dir_cpy = full_dir.clone();
let mut file = tokio::task::spawn_blocking(move || {
std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(full_dir_cpy)
})
.await
.map_err(|e| OxenError::basic_str(format!("spawn_blocking join error: {e}")))??;
log::debug!("liboxen::workspace::files::save_stream is writing to file: {file:?}");
tokio::task::spawn_blocking(move || file.write_all(&chunk).map(|_| file))
.await
.map_err(|e| OxenError::basic_str(format!("spawn_blocking join error: {e}")))??;
Ok(full_dir)
}
pub fn decompress_zip(zip_filepath: &PathBuf) -> Result<Vec<PathBuf>, OxenError> {
let mut files: Vec<PathBuf> = vec![];
let file = File::open(zip_filepath)?;
let mut archive = ZipArchive::new(file)
.map_err(|e| OxenError::basic_str(format!("Failed to access zip file: {e}")))?;
let mut total_size: u64 = 0;
for i in 0..archive.len() {
let zip_file = archive.by_index(i).map_err(|e| {
OxenError::basic_str(format!("Failed to access zip file at index {i}: {e}"))
})?;
let uncompressed_size = zip_file.size();
let compressed_size = zip_file.compressed_size();
if compressed_size > 0 {
let compression_ratio = uncompressed_size / compressed_size;
if compression_ratio > MAX_COMPRESSION_RATIO {
return Err(OxenError::basic_str(format!(
"Suspicious zip compression ratio: {compression_ratio} detected"
)));
}
} else if uncompressed_size > 0 {
return Err(OxenError::basic_str(
"Suspicious zip file: compressed size is 0 but uncompressed size is not",
));
}
total_size += uncompressed_size;
if total_size > MAX_DECOMPRESSED_SIZE {
return Err(OxenError::file_import_error(
"Decompressed size exceeds size limit of 1GB",
));
}
}
log::debug!("liboxen::files::decompress_zip zip filepath is {zip_filepath:?}");
let parent = match zip_filepath.parent() {
Some(p) => p.canonicalize()?,
None => std::env::current_dir()?,
};
for i in 0..archive.len() {
let mut zip_file = archive.by_index(i).map_err(|e| {
OxenError::basic_str(format!("Failed to access zip file at index {i}: {e}"))
})?;
let mut zipfile_name = zip_file.mangled_name();
if let Some(zipfile_name_str) = zipfile_name.to_str()
&& zipfile_name_str.chars().any(|c| c.is_whitespace())
{
let new_name = zipfile_name_str
.chars()
.map(|c| if c.is_whitespace() { '_' } else { c })
.collect::<String>();
zipfile_name = PathBuf::from(new_name);
}
let safe_path = sanitize_path(&zipfile_name)?;
let outpath = parent.join(&safe_path);
if !outpath.starts_with(&parent) {
return Err(OxenError::basic_str(format!(
"Attempted path traversal detected: {outpath:?}"
)));
}
log::debug!("files::decompress_zip unzipping file to: {outpath:?}");
if let Some(outdir) = outpath.parent() {
util::fs::create_dir_all(outdir)?;
}
if zip_file.is_dir() {
util::fs::create_dir_all(&outpath)?;
} else {
let mut outfile = File::create(&outpath)?;
let mut buffer = vec![0; BUFFER_SIZE_THRESHOLD];
loop {
let n = zip_file.read(&mut buffer)?;
if n == 0 {
break;
}
outfile.write_all(&buffer[..n])?;
}
}
files.push(outpath.clone());
}
log::debug!("files::decompress_zip removing zip file: {zip_filepath:?}");
std::fs::remove_file(zip_filepath)?;
Ok(files)
}
fn sanitize_path(path: &PathBuf) -> Result<PathBuf, OxenError> {
let mut components = Vec::new();
for component in path.components() {
match component {
Component::Normal(c) => components.push(c),
Component::CurDir => {} Component::ParentDir | Component::Prefix(_) | Component::RootDir => {
return Err(OxenError::basic_str(format!(
"Invalid path component in zip file: {path:?}"
)));
}
}
}
let safe_path = components.iter().collect::<PathBuf>();
Ok(safe_path)
}
async fn p_add_file(
base_repo: &LocalRepository,
workspace_repo: &LocalRepository,
maybe_head_commit: &Option<Commit>,
path: &Path,
) -> Result<(), OxenError> {
let version_store = base_repo.version_store()?;
let mut maybe_dir_node = None;
if let Some(head_commit) = maybe_head_commit {
let path = util::fs::path_relative_to_dir(path, &workspace_repo.path)?;
let parent_path = path.parent().unwrap_or(Path::new(""));
maybe_dir_node =
repositories::tree::get_dir_with_children(base_repo, head_commit, parent_path, None)?;
}
let file_name = path.file_name().unwrap_or_default().to_string_lossy();
let relative_path = util::fs::path_relative_to_dir(path, &workspace_repo.path)?;
let full_path = workspace_repo.path.join(&relative_path);
if !full_path.is_file() {
log::debug!("is not a file - skipping add on {full_path:?}");
return Ok(());
}
let file_status =
core::v_latest::add::determine_file_status(&maybe_dir_node, &file_name, &full_path)?;
let hash_str = file_status.hash.to_string();
version_store
.store_version_from_path(&hash_str, &full_path)
.await?;
let conflicts: HashSet<PathBuf> = repositories::merge::list_conflicts(workspace_repo)?
.into_iter()
.map(|conflict| conflict.merge_entry.path)
.collect();
let seen_dirs = Arc::new(Mutex::new(HashSet::new()));
process_add_file_with_staged_db_manager(
workspace_repo,
&workspace_repo.path,
&file_status,
path,
&seen_dirs,
&conflicts,
)
}
async fn p_rm(
base_repo: &LocalRepository,
workspace_repo: &LocalRepository,
commit: &Commit,
path: &Path,
) -> Result<Vec<ErrorFileInfo>, OxenError> {
log::debug!("p_rm: deleting file {path:?}");
let relative_path = util::fs::path_relative_to_dir(path, &workspace_repo.path)?;
let parent_path = path.parent().unwrap_or(Path::new(""));
let maybe_dir_node =
repositories::tree::get_dir_with_children(base_repo, commit, parent_path, None)?;
let file_name = util::fs::path_relative_to_dir(path, parent_path)?;
let seen_dirs = Arc::new(Mutex::new(HashSet::new()));
let mut err_files: Vec<ErrorFileInfo> = vec![];
if let Some(mut file_node) = get_file_node(&maybe_dir_node, &file_name)? {
file_node.set_name(&path.to_string_lossy());
err_files.extend(core::v_latest::rm::remove_file_with_db_manager(
workspace_repo,
&relative_path,
&file_node,
&seen_dirs,
)?);
} else if has_dir_node(&maybe_dir_node, &file_name)? {
if let Some(dir_node) = repositories::tree::get_dir_with_children_recursive(
base_repo,
commit,
&relative_path,
None,
)? {
core::v_latest::rm::remove_dir_with_db_manager(
workspace_repo,
&dir_node,
&relative_path,
&seen_dirs,
)?;
};
} else {
err_files.push(ErrorFileInfo {
hash: "".to_string(),
path: Some(path.to_path_buf()),
error: "Cannot call `oxen rm` on uncommitted files".to_string(),
});
}
Ok(err_files)
}
fn p_modify_file(
base_repo: &LocalRepository,
workspace_repo: &LocalRepository,
maybe_head_commit: &Option<Commit>,
path: &Path,
) -> Result<(), OxenError> {
let mut maybe_file_node = None;
if let Some(head_commit) = maybe_head_commit {
maybe_file_node = repositories::tree::get_file_by_path(base_repo, head_commit, path)?;
}
let seen_dirs = Arc::new(Mutex::new(HashSet::new()));
if let Some(mut file_node) = maybe_file_node {
file_node.set_name(path.to_str().unwrap());
log::debug!("p_modify_file file_node: {file_node}");
add_file_node_to_staged_db(
workspace_repo,
path,
StagedEntryStatus::Modified,
&file_node,
&seen_dirs,
)
} else {
Err(OxenError::basic_str("file not found in head commit"))
}
}
fn has_dir_node(
dir_node: &Option<MerkleTreeNode>,
path: impl AsRef<Path>,
) -> Result<bool, OxenError> {
if let Some(node) = dir_node {
if let Some(node) = node.get_by_path(path)? {
if let EMerkleTreeNode::Directory(_dir_node) = &node.node {
Ok(true)
} else {
Ok(false)
}
} else {
Ok(false)
}
} else {
Ok(false)
}
}
pub fn mv(
workspace: &Workspace,
path: impl AsRef<Path>,
new_path: impl AsRef<Path>,
) -> Result<PathBuf, OxenError> {
let path = path.as_ref();
let new_path = new_path.as_ref();
let workspace_repo = &workspace.workspace_repo;
let staged_entry = with_staged_db_manager(workspace_repo, |staged_db_manager| {
staged_db_manager.read_from_staged_db(path)
})?;
let file_node = if let Some(entry) = staged_entry {
entry.node.file()?
} else {
repositories::tree::get_file_by_path(&workspace.base_repo, &workspace.commit, path)?
.ok_or_else(|| OxenError::path_does_not_exist(path))?
};
let is_same_path = path == new_path;
let mut new_file_node = file_node.clone();
new_file_node.set_name(new_path.to_str().unwrap());
let dest_exists_in_base =
repositories::tree::get_file_by_path(&workspace.base_repo, &workspace.commit, new_path)?
.is_some();
let new_status = if dest_exists_in_base {
StagedEntryStatus::Modified
} else {
StagedEntryStatus::Added
};
let seen_dirs = Arc::new(Mutex::new(HashSet::new()));
with_staged_db_manager(workspace_repo, |staged_db_manager| {
if staged_db_manager.read_from_staged_db(new_path)?.is_some() {
return Err(OxenError::basic_str(format!(
"Destination already staged: {new_path:?}"
)));
}
staged_db_manager.upsert_file_node(new_path, new_status, &new_file_node)?;
if !is_same_path {
let source_exists_in_base = repositories::tree::get_file_by_path(
&workspace.base_repo,
&workspace.commit,
path,
)?
.is_some();
if source_exists_in_base {
let mut removed_file_node = file_node.clone();
removed_file_node.set_name(path.to_str().unwrap());
staged_db_manager.upsert_file_node(
path,
StagedEntryStatus::Removed,
&removed_file_node,
)?;
if let Some(parents) = path.parent() {
for dir in parents.ancestors() {
staged_db_manager.add_directory(dir, &seen_dirs)?;
if dir == Path::new("") {
break;
}
}
}
} else {
staged_db_manager.delete_entry(path)?;
}
}
if let Some(parents) = new_path.parent() {
for dir in parents.ancestors() {
staged_db_manager.add_directory(dir, &seen_dirs)?;
if dir == Path::new("") {
break;
}
}
}
Ok(())
})?;
let relative_path = util::fs::path_relative_to_dir(new_path, &workspace_repo.path)?;
Ok(relative_path)
}