use crate::api;
use crate::constants::{self, DEFAULT_BRANCH_NAME, HISTORY_DIR, VERSIONS_DIR};
use crate::core::db;
use crate::core::db::{kv_db, path_db};
use crate::core::index::{CommitDirEntryWriter, RefWriter, SchemaWriter};
use crate::error::OxenError;
use crate::model::schema::Schema;
use crate::model::{
Commit, CommitEntry, LocalRepository, StagedData, StagedEntry, StagedEntryStatus,
};
use crate::util;
use crate::util::progress_bar::{oxen_progress_bar, ProgressBarType};
use filetime::FileTime;
use rayon::prelude::*;
use rocksdb::{DBWithThreadMode, MultiThreaded};
use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use super::{CommitDirEntryReader, CommitEntryReader};
pub struct CommitEntryWriter {
repository: LocalRepository,
dir_db: DBWithThreadMode<MultiThreaded>,
commit: Commit,
}
impl CommitEntryWriter {
pub fn versions_dir(path: &Path) -> PathBuf {
util::fs::oxen_hidden_dir(path).join(Path::new(VERSIONS_DIR))
}
pub fn commit_dir(path: &Path, commit_id: &str) -> PathBuf {
util::fs::oxen_hidden_dir(path)
.join(Path::new(HISTORY_DIR))
.join(commit_id)
}
pub fn commit_dir_db(path: &Path, commit_id: &str) -> PathBuf {
CommitEntryWriter::commit_dir(path, commit_id).join(constants::DIRS_DIR)
}
pub fn new(
repository: &LocalRepository,
commit: &Commit,
) -> Result<CommitEntryWriter, OxenError> {
log::debug!("CommitEntryWriter::new() commit_id: {}", commit.id);
let db_path = CommitEntryWriter::commit_dir_db(&repository.path, &commit.id);
if !db_path.exists() {
util::fs::create_dir_all(&db_path)?;
}
let opts = db::opts::default();
Ok(CommitEntryWriter {
repository: repository.clone(),
dir_db: DBWithThreadMode::open(&opts, dunce::simplified(&db_path))?,
commit: commit.to_owned(),
})
}
pub fn copy_parent_dbs(
&self,
repo: &LocalRepository,
parent_ids: &Vec<String>,
) -> Result<(), OxenError> {
if parent_ids.is_empty() {
let ref_writer = RefWriter::new(repo)?;
ref_writer.create_branch(DEFAULT_BRANCH_NAME, &self.commit.id)?;
ref_writer.set_head(DEFAULT_BRANCH_NAME);
}
log::debug!(
"copy_parent_dbs {} -> '{}'",
self.commit.id,
self.commit.message
);
for parent_id in parent_ids {
let parent_commit = api::local::commits::get_by_id(repo, parent_id)?
.ok_or(OxenError::revision_not_found(parent_id.to_owned().into()))?;
log::debug!(
"copy parent {} -> '{}'",
parent_commit.id,
parent_commit.message
);
let reader = CommitEntryReader::new(repo, &parent_commit)?;
self.write_entries_from_reader(&reader)?;
}
Ok(())
}
fn write_entries_from_reader(&self, reader: &CommitEntryReader) -> Result<(), OxenError> {
let dirs = reader.list_dirs()?;
for dir in dirs {
let writer = CommitDirEntryWriter::new(&self.repository, &self.commit.id, &dir)?;
path_db::put(&self.dir_db, &dir, &0)?;
let dir_reader = CommitDirEntryReader::new(&self.repository, &reader.commit_id, &dir)?;
let entries = dir_reader.list_entries()?;
log::debug!(
"write_entries_from_reader got {} entries for dir {:?}",
entries.len(),
dir
);
entries.par_iter().for_each(|entry| {
log::debug!("copy entry {:?} -> {:?}", dir, entry.path);
match writer.add_commit_entry(entry) {
Ok(_) => {}
Err(err) => {
log::error!("write_entries_from_reader {err:?}");
}
}
});
}
Ok(())
}
pub fn set_file_timestamps(
&self,
entry: &CommitEntry,
time: &FileTime,
) -> Result<(), OxenError> {
if let Some(parent) = entry.path.parent() {
let writer = CommitDirEntryWriter::new(&self.repository, &self.commit.id, parent)?;
writer.set_file_timestamps(entry, time)
} else {
Err(OxenError::file_has_no_parent(&entry.path))
}
}
fn add_staged_entry_to_db(
&self,
writer: &CommitDirEntryWriter,
new_commit: &Commit,
origin_path: &Path,
file_path: &Path,
) -> Result<(), OxenError> {
let full_path = origin_path.join(file_path);
let metadata = fs::metadata(&full_path).unwrap();
let mtime = FileTime::from_last_modification_time(&metadata);
let metadata = fs::metadata(&full_path)?;
let hash = util::hasher::hash_file_contents(&full_path)?;
let entry = CommitEntry {
commit_id: new_commit.id.to_owned(),
path: file_path.to_path_buf(),
hash: hash.to_owned(),
num_bytes: metadata.len(),
last_modified_seconds: mtime.unix_seconds(),
last_modified_nanoseconds: mtime.nanoseconds(),
};
self.add_commit_entry(origin_path, writer, entry)?;
Ok(())
}
fn add_commit_entry(
&self,
origin_path: &Path,
writer: &CommitDirEntryWriter,
commit_entry: CommitEntry,
) -> Result<(), OxenError> {
let entry = self.backup_file_to_versions_dir(origin_path, commit_entry)?;
log::debug!(
"add_commit_entry with hash {:?} -> {}",
entry.path,
entry.hash
);
writer.add_commit_entry(&entry)
}
fn backup_file_to_versions_dir(
&self,
origin_path: &Path, commit_entry: CommitEntry,
) -> Result<CommitEntry, OxenError> {
let full_path = origin_path.join(&commit_entry.path);
log::debug!(
"backup_file_to_versions_dir {:?} -> {:?}",
commit_entry.path,
full_path
);
let versions_entry_path = util::fs::version_path(&self.repository, &commit_entry);
let versions_entry_dir = versions_entry_path.parent().unwrap();
log::debug!(
"Copying commit entry for file: {:?} -> {:?}",
commit_entry.path,
versions_entry_path
);
if !versions_entry_dir.exists() {
std::fs::create_dir_all(versions_entry_dir)?;
}
util::fs::copy(full_path, versions_entry_path)?;
Ok(commit_entry)
}
pub fn commit_staged_entries(
&self,
commit: &Commit,
staged_data: &StagedData,
origin_path: &Path,
) -> Result<(), OxenError> {
self.copy_parent_dbs(&self.repository, &commit.parent_ids.clone())?;
self.commit_staged_entries_with_prog(commit, staged_data, origin_path)?;
self.commit_schemas(commit, &staged_data.staged_schemas)
}
fn commit_schemas(
&self,
commit: &Commit,
schemas: &HashMap<PathBuf, Schema>,
) -> Result<(), OxenError> {
log::debug!("commit_schemas got {} schemas", schemas.len());
let schema_writer = SchemaWriter::new(&self.repository, &commit.id)?;
for (path, schema) in schemas.iter() {
if !schema_writer.has_schema(schema) {
schema_writer.put_schema(schema)?;
}
schema_writer.put_schema_for_file(path, schema)?;
}
Ok(())
}
fn group_staged_files_to_dirs(
&self,
files: &HashMap<PathBuf, StagedEntry>,
) -> HashMap<PathBuf, Vec<(PathBuf, StagedEntry)>> {
let mut results: HashMap<PathBuf, Vec<(PathBuf, StagedEntry)>> = HashMap::new();
for (path, entry) in files.iter() {
if let Some(parent) = path.parent() {
results
.entry(parent.to_path_buf())
.or_default()
.push((path.clone(), entry.clone()));
}
}
results
}
fn commit_staged_entries_with_prog(
&self,
commit: &Commit,
staged_data: &StagedData,
origin_path: &Path,
) -> Result<(), OxenError> {
let size: u64 = unsafe { std::mem::transmute(staged_data.staged_files.len()) };
if size == 0 {
return Ok(());
}
let bar = oxen_progress_bar(size, ProgressBarType::Counter);
let grouped = self.group_staged_files_to_dirs(&staged_data.staged_files);
log::debug!(
"commit_staged_entries_with_prog got groups {}",
grouped.len()
);
for (dir, files) in grouped.iter() {
let entry_writer = CommitDirEntryWriter::new(&self.repository, &self.commit.id, dir)?;
path_db::put(&self.dir_db, dir, &0)?;
files.par_iter().for_each(|(path, entry)| {
self.commit_staged_entry(&entry_writer, commit, origin_path, path, entry);
bar.inc(1);
});
}
for (_path, staged_dirs) in staged_data.staged_dirs.paths.iter() {
for staged_dir in staged_dirs.iter() {
log::debug!(
"commit_staged_entries_with_prog adding dir {:?} -> {:?}",
staged_dir.path,
staged_dir.status
);
if staged_dir.status == StagedEntryStatus::Removed {
let entry_writer = CommitDirEntryWriter::new(
&self.repository,
&self.commit.id,
&staged_dir.path,
)?;
let num_entries = kv_db::count(&entry_writer.db)?;
if num_entries == 0 {
path_db::delete(&self.dir_db, &staged_dir.path)?;
continue;
}
}
path_db::put(&self.dir_db, &staged_dir.path, &0)?;
}
}
bar.finish_and_clear();
Ok(())
}
fn commit_staged_entry(
&self,
writer: &CommitDirEntryWriter,
commit: &Commit,
origin_path: &Path,
path: &Path,
entry: &StagedEntry,
) {
match entry.status {
StagedEntryStatus::Removed => match writer.remove_path_from_db(path) {
Ok(_) => {}
Err(err) => {
let err = format!("Failed to remove file: {err}");
panic!("{}", err)
}
},
StagedEntryStatus::Modified => {
match self.add_staged_entry_to_db(writer, commit, origin_path, path) {
Ok(_) => {}
Err(err) => {
let err = format!("Failed to commit MODIFIED file: {err}");
panic!("{}", err)
}
}
}
StagedEntryStatus::Added => {
match self.add_staged_entry_to_db(writer, commit, origin_path, path) {
Ok(_) => {}
Err(err) => {
let err = format!("Failed to ADD file: {err}");
panic!("{}", err)
}
}
}
}
}
}
#[cfg(test)]
mod tests {}