use bytesize::ByteSize;
use dragonfly_api::common::v2::Range;
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::{Error, Result};
use dragonfly_client_util::fs::fallocate;
use std::os::unix::fs::MetadataExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs::{self, File, OpenOptions};
use tokio::io::{
self, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter, SeekFrom,
};
use tokio_util::io::InspectReader;
use tracing::{debug, error, info, instrument, warn};
use walkdir::WalkDir;
pub struct Content {
pub config: Arc<Config>,
pub dir: PathBuf,
}
impl Content {
pub async fn new(config: Arc<Config>, dir: &Path) -> Result<Content> {
let dir = dir.join(super::content::DEFAULT_CONTENT_DIR);
if !config.storage.keep {
fs::remove_dir_all(&dir).await.unwrap_or_else(|err| {
warn!("remove {:?} failed: {}", dir, err);
});
}
fs::create_dir_all(&dir.join(super::content::DEFAULT_TASK_DIR)).await?;
fs::create_dir_all(&dir.join(super::content::DEFAULT_PERSISTENT_TASK_DIR)).await?;
fs::create_dir_all(&dir.join(super::content::DEFAULT_PERSISTENT_CACHE_TASK_DIR)).await?;
info!("content initialized directory: {:?}", dir);
Ok(Content { config, dir })
}
pub fn available_space(&self) -> Result<u64> {
let disk_threshold = self.config.gc.policy.disk_threshold;
if disk_threshold != ByteSize::default() {
let usage_space = WalkDir::new(&self.dir)
.into_iter()
.filter_map(|entry| entry.ok())
.filter_map(|entry| entry.metadata().ok())
.filter(|metadata| metadata.is_file())
.fold(0, |acc, m| acc + m.len());
if usage_space >= disk_threshold.as_u64() {
warn!(
"usage space {} is greater than disk threshold {}, no need to calculate available space",
usage_space, disk_threshold
);
return Ok(0);
}
return Ok(disk_threshold.as_u64() - usage_space);
}
let stat = fs2::statvfs(&self.dir)?;
Ok(stat.available_space())
}
pub fn total_space(&self) -> Result<u64> {
let disk_threshold = self.config.gc.policy.disk_threshold;
if disk_threshold != ByteSize::default() {
return Ok(disk_threshold.as_u64());
}
let stat = fs2::statvfs(&self.dir)?;
Ok(stat.total_space())
}
pub fn has_enough_space(&self, content_length: u64) -> Result<bool> {
let available_space = self.available_space()?;
if available_space < content_length {
warn!(
"not enough space to store the task: available_space={}, content_length={}",
available_space, content_length
);
return Ok(false);
}
Ok(true)
}
async fn is_same_dev_inode<P: AsRef<Path>, Q: AsRef<Path>>(
&self,
source: P,
target: Q,
) -> Result<bool> {
let source_metadata = fs::metadata(source).await?;
let target_metadata = fs::metadata(target).await?;
Ok(source_metadata.dev() == target_metadata.dev()
&& source_metadata.ino() == target_metadata.ino())
}
pub async fn is_same_dev_inode_as_task(&self, task_id: &str, to: &Path) -> Result<bool> {
let task_path = self.get_task_path(task_id);
self.is_same_dev_inode(&task_path, to).await
}
#[instrument(skip_all)]
pub async fn create_task(&self, task_id: &str, length: u64) -> Result<PathBuf> {
let task_path = self.get_task_path(task_id);
if task_path.exists() {
return Ok(task_path);
}
let task_dir = self
.dir
.join(super::content::DEFAULT_TASK_DIR)
.join(&task_id[..3]);
fs::create_dir_all(&task_dir).await.inspect_err(|err| {
error!("create {:?} failed: {}", task_dir, err);
})?;
let f = fs::File::create(task_dir.join(task_id))
.await
.inspect_err(|err| {
error!("create {:?} failed: {}", task_dir, err);
})?;
fallocate(&f, length).await.inspect_err(|err| {
error!("fallocate {:?} failed: {}", task_dir, err);
})?;
Ok(task_dir.join(task_id))
}
#[instrument(skip_all)]
pub async fn hard_link_task(&self, task_id: &str, to: &Path) -> Result<()> {
let task_path = self.get_task_path(task_id);
if let Err(err) = fs::hard_link(task_path.clone(), to).await {
if err.kind() == std::io::ErrorKind::AlreadyExists {
if let Ok(true) = self.is_same_dev_inode(&task_path, to).await {
info!("hard already exists, no need to operate");
return Ok(());
}
}
warn!("hard link {:?} to {:?} failed: {}", task_path, to, err);
return Err(Error::IO(err));
}
info!("hard link {:?} to {:?} success", task_path, to);
Ok(())
}
#[instrument(skip_all)]
pub async fn copy_task(&self, task_id: &str, to: &Path) -> Result<()> {
fs::copy(self.get_task_path(task_id), to).await?;
info!("copy to {:?} success", to);
Ok(())
}
pub async fn delete_task(&self, task_id: &str) -> Result<()> {
info!("delete task content: {}", task_id);
let task_path = self.get_task_path(task_id);
fs::remove_file(task_path.as_path())
.await
.inspect_err(|err| {
error!("remove {:?} failed: {}", task_path, err);
})?;
Ok(())
}
#[instrument(skip_all)]
pub async fn read_piece(
&self,
task_id: &str,
offset: u64,
length: u64,
range: Option<Range>,
) -> Result<impl AsyncRead> {
let task_path = self.get_task_path(task_id);
let (target_offset, target_length) =
super::content::calculate_piece_range(offset, length, range);
let f = File::open(task_path.as_path()).await.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err);
})?;
let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);
f_reader
.seek(SeekFrom::Start(target_offset))
.await
.inspect_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
})?;
Ok(f_reader.take(target_length))
}
#[instrument(skip_all)]
pub async fn read_piece_with_dual_read(
&self,
task_id: &str,
offset: u64,
length: u64,
range: Option<Range>,
) -> Result<(impl AsyncRead, impl AsyncRead)> {
let task_path = self.get_task_path(task_id);
let (target_offset, target_length) =
super::content::calculate_piece_range(offset, length, range);
let f = File::open(task_path.as_path()).await.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err);
})?;
let mut f_range_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);
f_range_reader
.seek(SeekFrom::Start(target_offset))
.await
.inspect_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
})?;
let range_reader = f_range_reader.take(target_length);
let f = File::open(task_path.as_path()).await.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err);
})?;
let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);
f_reader
.seek(SeekFrom::Start(offset))
.await
.inspect_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
})?;
let reader = f_reader.take(length);
Ok((range_reader, reader))
}
#[instrument(skip_all)]
pub async fn write_piece<R: AsyncRead + Unpin + ?Sized>(
&self,
task_id: &str,
offset: u64,
expected_length: u64,
reader: &mut R,
) -> Result<super::content::WritePieceResponse> {
let task_path = self.get_task_path(task_id);
let mut f = OpenOptions::new()
.truncate(false)
.write(true)
.open(task_path.as_path())
.await
.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err);
})?;
f.seek(SeekFrom::Start(offset)).await.inspect_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
})?;
let reader = reader.take(expected_length);
let reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader);
let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, f);
let mut hasher = crc32fast::Hasher::new();
let mut tee = InspectReader::new(reader, |bytes| {
hasher.update(bytes);
});
debug!("start to write piece to {:?}", task_path);
let length = io::copy(&mut tee, &mut writer).await.inspect_err(|err| {
error!("copy {:?} failed: {}", task_path, err);
})?;
writer.flush().await.inspect_err(|err| {
error!("flush {:?} failed: {}", task_path, err);
})?;
debug!("finish to write piece to {:?}", task_path);
if length != expected_length {
return Err(Error::Unknown(format!(
"expected length {} but got {}",
expected_length, length
)));
}
Ok(super::content::WritePieceResponse {
length,
hash: hasher.finalize().to_string(),
})
}
fn get_task_path(&self, task_id: &str) -> PathBuf {
let sub_dir = &task_id[..3];
self.dir
.join(super::content::DEFAULT_TASK_DIR)
.join(sub_dir)
.join(task_id)
}
pub async fn is_same_dev_inode_as_persistent_task(
&self,
task_id: &str,
to: &Path,
) -> Result<bool> {
let task_path = self.get_persistent_task_path(task_id);
self.is_same_dev_inode(&task_path, to).await
}
#[instrument(skip_all)]
pub async fn create_persistent_task(&self, task_id: &str, length: u64) -> Result<PathBuf> {
let task_path = self.get_persistent_task_path(task_id);
if task_path.exists() {
return Ok(task_path);
}
let task_dir = self
.dir
.join(super::content::DEFAULT_PERSISTENT_TASK_DIR)
.join(&task_id[..3]);
fs::create_dir_all(&task_dir).await.inspect_err(|err| {
error!("create {:?} failed: {}", task_dir, err);
})?;
let f = fs::File::create(task_dir.join(task_id))
.await
.inspect_err(|err| {
error!("create {:?} failed: {}", task_dir, err);
})?;
fallocate(&f, length).await.inspect_err(|err| {
error!("fallocate {:?} failed: {}", task_dir, err);
})?;
Ok(task_dir.join(task_id))
}
#[instrument(skip_all)]
pub async fn create_persistent_task_dir(&self, task_id: &str) -> Result<PathBuf> {
let task_path = self.get_persistent_task_path(task_id);
if task_path.exists() {
return Ok(task_path);
}
let task_dir = self
.dir
.join(super::content::DEFAULT_PERSISTENT_TASK_DIR)
.join(&task_id[..3]);
fs::create_dir_all(&task_dir).await.inspect_err(|err| {
error!("create {:?} failed: {}", task_dir, err);
})?;
Ok(task_dir)
}
#[instrument(skip_all)]
pub async fn hard_link_persistent_task(&self, task_id: &str, to: &Path) -> Result<()> {
let task_path = self.get_persistent_task_path(task_id);
if let Err(err) = fs::hard_link(task_path.clone(), to).await {
if err.kind() == std::io::ErrorKind::AlreadyExists {
if let Ok(true) = self.is_same_dev_inode(&task_path, to).await {
info!("hard already exists, no need to operate");
return Ok(());
}
}
warn!("hard link {:?} to {:?} failed: {}", task_path, to, err);
return Err(Error::IO(err));
}
info!("hard link {:?} to {:?} success", task_path, to);
Ok(())
}
#[instrument(skip_all)]
pub async fn hard_link_to_persistent_task(&self, from: &Path, task_id: &str) -> Result<()> {
let task_path = self.get_persistent_task_path(task_id);
if let Err(err) = fs::hard_link(from, &task_path).await {
if err.kind() == std::io::ErrorKind::AlreadyExists {
if let Ok(true) = self.is_same_dev_inode(from, &task_path).await {
info!("hard already exists, no need to operate");
return Ok(());
}
}
warn!("hard link {:?} to {:?} failed: {}", task_path, from, err);
return Err(Error::IO(err));
}
info!("hard link {:?} to {:?} success", from, task_path);
Ok(())
}
#[instrument(skip_all)]
pub async fn copy_persistent_task(&self, task_id: &str, to: &Path) -> Result<()> {
fs::copy(self.get_persistent_task_path(task_id), to).await?;
info!("copy to {:?} success", to);
Ok(())
}
#[instrument(skip_all)]
pub async fn read_persistent_piece(
&self,
task_id: &str,
offset: u64,
length: u64,
range: Option<Range>,
) -> Result<impl AsyncRead> {
let task_path = self.get_persistent_task_path(task_id);
let (target_offset, target_length) =
super::content::calculate_piece_range(offset, length, range);
let f = File::open(task_path.as_path()).await.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err);
})?;
let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);
f_reader
.seek(SeekFrom::Start(target_offset))
.await
.inspect_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
})?;
Ok(f_reader.take(target_length))
}
#[instrument(skip_all)]
pub async fn write_persistent_piece<R: AsyncRead + Unpin + ?Sized>(
&self,
task_id: &str,
offset: u64,
expected_length: u64,
reader: &mut R,
) -> Result<super::content::WritePieceResponse> {
let task_path = self.get_persistent_task_path(task_id);
let mut f = OpenOptions::new()
.truncate(false)
.write(true)
.open(task_path.as_path())
.await
.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err);
})?;
f.seek(SeekFrom::Start(offset)).await.inspect_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
})?;
let reader = reader.take(expected_length);
let reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader);
let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, f);
let mut hasher = crc32fast::Hasher::new();
let mut tee = InspectReader::new(reader, |bytes| {
hasher.update(bytes);
});
debug!("start to write piece to {:?}", task_path);
let length = io::copy(&mut tee, &mut writer).await.inspect_err(|err| {
error!("copy {:?} failed: {}", task_path, err);
})?;
writer.flush().await.inspect_err(|err| {
error!("flush {:?} failed: {}", task_path, err);
})?;
debug!("finish to write piece to {:?}", task_path);
if length != expected_length {
return Err(Error::Unknown(format!(
"expected length {} but got {}",
expected_length, length
)));
}
Ok(super::content::WritePieceResponse {
length,
hash: hasher.finalize().to_string(),
})
}
pub async fn delete_persistent_task(&self, task_id: &str) -> Result<()> {
info!("delete persistent task content: {}", task_id);
let persistent_task_path = self.get_persistent_task_path(task_id);
fs::remove_file(persistent_task_path.as_path())
.await
.inspect_err(|err| {
error!("remove {:?} failed: {}", persistent_task_path, err);
})?;
Ok(())
}
fn get_persistent_task_path(&self, task_id: &str) -> PathBuf {
self.dir
.join(super::content::DEFAULT_PERSISTENT_TASK_DIR)
.join(&task_id[..3])
.join(task_id)
}
pub async fn is_same_dev_inode_as_persistent_cache_task(
&self,
task_id: &str,
to: &Path,
) -> Result<bool> {
let task_path = self.get_persistent_cache_task_path(task_id);
self.is_same_dev_inode(&task_path, to).await
}
#[instrument(skip_all)]
pub async fn create_persistent_cache_task(
&self,
task_id: &str,
length: u64,
) -> Result<PathBuf> {
let task_path = self.get_persistent_cache_task_path(task_id);
if task_path.exists() {
return Ok(task_path);
}
let task_dir = self
.dir
.join(super::content::DEFAULT_PERSISTENT_CACHE_TASK_DIR)
.join(&task_id[..3]);
fs::create_dir_all(&task_dir).await.inspect_err(|err| {
error!("create {:?} failed: {}", task_dir, err);
})?;
let f = fs::File::create(task_dir.join(task_id))
.await
.inspect_err(|err| {
error!("create {:?} failed: {}", task_dir, err);
})?;
fallocate(&f, length).await.inspect_err(|err| {
error!("fallocate {:?} failed: {}", task_dir, err);
})?;
Ok(task_dir.join(task_id))
}
#[instrument(skip_all)]
pub async fn create_persistent_cache_task_dir(&self, task_id: &str) -> Result<PathBuf> {
let task_path = self.get_persistent_cache_task_path(task_id);
if task_path.exists() {
return Ok(task_path);
}
let task_dir = self
.dir
.join(super::content::DEFAULT_PERSISTENT_CACHE_TASK_DIR)
.join(&task_id[..3]);
fs::create_dir_all(&task_dir).await.inspect_err(|err| {
error!("create {:?} failed: {}", task_dir, err);
})?;
Ok(task_dir)
}
#[instrument(skip_all)]
pub async fn hard_link_persistent_cache_task(&self, task_id: &str, to: &Path) -> Result<()> {
let task_path = self.get_persistent_cache_task_path(task_id);
if let Err(err) = fs::hard_link(task_path.clone(), to).await {
if err.kind() == std::io::ErrorKind::AlreadyExists {
if let Ok(true) = self.is_same_dev_inode(&task_path, to).await {
info!("hard already exists, no need to operate");
return Ok(());
}
}
warn!("hard link {:?} to {:?} failed: {}", task_path, to, err);
return Err(Error::IO(err));
}
info!("hard link {:?} to {:?} success", task_path, to);
Ok(())
}
#[instrument(skip_all)]
pub async fn hard_link_to_persistent_cache_task(
&self,
from: &Path,
task_id: &str,
) -> Result<()> {
let task_path = self.get_persistent_cache_task_path(task_id);
if let Err(err) = fs::hard_link(from, &task_path).await {
if err.kind() == std::io::ErrorKind::AlreadyExists {
if let Ok(true) = self.is_same_dev_inode(from, &task_path).await {
info!("hard already exists, no need to operate");
return Ok(());
}
}
warn!("hard link {:?} to {:?} failed: {}", task_path, from, err);
return Err(Error::IO(err));
}
info!("hard link {:?} to {:?} success", from, task_path);
Ok(())
}
#[instrument(skip_all)]
pub async fn copy_persistent_cache_task(&self, task_id: &str, to: &Path) -> Result<()> {
fs::copy(self.get_persistent_cache_task_path(task_id), to).await?;
info!("copy to {:?} success", to);
Ok(())
}
#[instrument(skip_all)]
pub async fn read_persistent_cache_piece(
&self,
task_id: &str,
offset: u64,
length: u64,
range: Option<Range>,
) -> Result<impl AsyncRead> {
let task_path = self.get_persistent_cache_task_path(task_id);
let (target_offset, target_length) =
super::content::calculate_piece_range(offset, length, range);
let f = File::open(task_path.as_path()).await.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err);
})?;
let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);
f_reader
.seek(SeekFrom::Start(target_offset))
.await
.inspect_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
})?;
Ok(f_reader.take(target_length))
}
#[instrument(skip_all)]
pub async fn write_persistent_cache_piece<R: AsyncRead + Unpin + ?Sized>(
&self,
task_id: &str,
offset: u64,
expected_length: u64,
reader: &mut R,
) -> Result<super::content::WritePieceResponse> {
let task_path = self.get_persistent_cache_task_path(task_id);
let mut f = OpenOptions::new()
.truncate(false)
.write(true)
.open(task_path.as_path())
.await
.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err);
})?;
f.seek(SeekFrom::Start(offset)).await.inspect_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
})?;
let reader = reader.take(expected_length);
let reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader);
let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, f);
let mut hasher = crc32fast::Hasher::new();
let mut tee = InspectReader::new(reader, |bytes| {
hasher.update(bytes);
});
debug!("start to write piece to {:?}", task_path);
let length = io::copy(&mut tee, &mut writer).await.inspect_err(|err| {
error!("copy {:?} failed: {}", task_path, err);
})?;
writer.flush().await.inspect_err(|err| {
error!("flush {:?} failed: {}", task_path, err);
})?;
debug!("finish to write piece to {:?}", task_path);
if length != expected_length {
return Err(Error::Unknown(format!(
"expected length {} but got {}",
expected_length, length
)));
}
Ok(super::content::WritePieceResponse {
length,
hash: hasher.finalize().to_string(),
})
}
pub async fn delete_persistent_cache_task(&self, task_id: &str) -> Result<()> {
info!("delete persistent cache task content: {}", task_id);
let persistent_cache_task_path = self.get_persistent_cache_task_path(task_id);
fs::remove_file(persistent_cache_task_path.as_path())
.await
.inspect_err(|err| {
error!("remove {:?} failed: {}", persistent_cache_task_path, err);
})?;
Ok(())
}
fn get_persistent_cache_task_path(&self, task_id: &str) -> PathBuf {
self.dir
.join(super::content::DEFAULT_PERSISTENT_CACHE_TASK_DIR)
.join(&task_id[..3])
.join(task_id)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::content;
use std::io::Cursor;
use tempfile::tempdir;
#[tokio::test]
async fn test_create_task() {
let temp_dir = tempdir().unwrap();
let config = Arc::new(Config::default());
let content = Content::new(config, temp_dir.path()).await.unwrap();
let task_id = "60409bd0ec44160f44c53c39b3fe1c5fdfb23faded0228c68bee83bc15a200e3";
let task_path = content.create_task(task_id, 0).await.unwrap();
assert!(task_path.exists());
assert_eq!(task_path, temp_dir.path().join("content/tasks/604/60409bd0ec44160f44c53c39b3fe1c5fdfb23faded0228c68bee83bc15a200e3"));
let task_path_exists = content.create_task(task_id, 0).await.unwrap();
assert_eq!(task_path, task_path_exists);
}
#[tokio::test]
async fn test_hard_link_task() {
let temp_dir = tempdir().unwrap();
let config = Arc::new(Config::default());
let content = Content::new(config, temp_dir.path()).await.unwrap();
let task_id = "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4";
content.create_task(task_id, 0).await.unwrap();
let to = temp_dir
.path()
.join("c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4");
content.hard_link_task(task_id, &to).await.unwrap();
assert!(to.exists());
content.hard_link_task(task_id, &to).await.unwrap();
}
#[tokio::test]
async fn test_copy_task() {
let temp_dir = tempdir().unwrap();
let config = Arc::new(Config::default());
let content = Content::new(config, temp_dir.path()).await.unwrap();
let task_id = "bfd3c02fb31a7373e25b405fd5fd3082987ccfbaf210889153af9e65bbf13002";
content.create_task(task_id, 64).await.unwrap();
let to = temp_dir
.path()
.join("bfd3c02fb31a7373e25b405fd5fd3082987ccfbaf210889153af9e65bbf13002");
content.copy_task(task_id, &to).await.unwrap();
assert!(to.exists());
}
#[tokio::test]
async fn test_delete_task() {
let temp_dir = tempdir().unwrap();
let config = Arc::new(Config::default());
let content = Content::new(config, temp_dir.path()).await.unwrap();
let task_id = "4e19f03b0fceb38f23ff4f657681472a53ef335db3660ae5494912570b7a2bb7";
let task_path = content.create_task(task_id, 0).await.unwrap();
assert!(task_path.exists());
content.delete_task(task_id).await.unwrap();
assert!(!task_path.exists());
}
#[tokio::test]
async fn test_read_piece() {
let temp_dir = tempdir().unwrap();
let config = Arc::new(Config::default());
let content = Content::new(config, temp_dir.path()).await.unwrap();
let task_id = "c794a3bbae81e06d1c8d362509bdd42a7c105b0fb28d80ffe27f94b8f04fc845";
content.create_task(task_id, 13).await.unwrap();
let data = b"hello, world!";
let mut reader = Cursor::new(data);
content
.write_piece(task_id, 0, 13, &mut reader)
.await
.unwrap();
let mut reader = content.read_piece(task_id, 0, 13, None).await.unwrap();
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await.unwrap();
assert_eq!(buffer, data);
let mut reader = content
.read_piece(
task_id,
0,
13,
Some(Range {
start: 0,
length: 5,
}),
)
.await
.unwrap();
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await.unwrap();
assert_eq!(buffer, b"hello");
}
#[tokio::test]
async fn test_write_piece() {
let temp_dir = tempdir().unwrap();
let config = Arc::new(Config::default());
let content = Content::new(config, temp_dir.path()).await.unwrap();
let task_id = "60b48845606946cea72084f14ed5cce61ec96e69f80a30f891a6963dccfd5b4f";
content.create_task(task_id, 4).await.unwrap();
let data = b"test";
let mut reader = Cursor::new(data);
let response = content
.write_piece(task_id, 0, 4, &mut reader)
.await
.unwrap();
assert_eq!(response.length, 4);
assert!(!response.hash.is_empty());
}
#[tokio::test]
async fn test_create_persistent_task() {
let temp_dir = tempdir().unwrap();
let config = Arc::new(Config::default());
let content = Content::new(config, temp_dir.path()).await.unwrap();
let task_id = "c4f108ab1d2b8cfdffe89ea9676af35123fa02e3c25167d62538f630d5d44745";
let task_path = content.create_persistent_task(task_id, 0).await.unwrap();
assert!(task_path.exists());
assert_eq!(task_path, temp_dir.path().join("content/persistent-tasks/c4f/c4f108ab1d2b8cfdffe89ea9676af35123fa02e3c25167d62538f630d5d44745"));
let task_path_exists = content.create_persistent_task(task_id, 0).await.unwrap();
assert_eq!(task_path, task_path_exists);
}
#[tokio::test]
async fn test_hard_link_persistent_task() {
let temp_dir = tempdir().unwrap();
let config = Arc::new(Config::default());
let content = Content::new(config, temp_dir.path()).await.unwrap();
let task_id = "5e81970eb2b048910cc84cab026b951f2ceac0a09c72c0717193bb6e466e11cd";
content.create_persistent_task(task_id, 0).await.unwrap();
let to = temp_dir
.path()
.join("5e81970eb2b048910cc84cab026b951f2ceac0a09c72c0717193bb6e466e11cd");
content
.hard_link_persistent_task(task_id, &to)
.await
.unwrap();
assert!(to.exists());
content
.hard_link_persistent_task(task_id, &to)
.await
.unwrap();
}
#[tokio::test]
async fn test_copy_persistent_task() {
let temp_dir = tempdir().unwrap();
let config = Arc::new(Config::default());
let content = Content::new(config, temp_dir.path()).await.unwrap();
let task_id = "194b9c2018429689fb4e596a506c7e9db564c187b9709b55b33b96881dfb6dd5";
content.create_persistent_task(task_id, 64).await.unwrap();
let to = temp_dir
.path()
.join("194b9c2018429689fb4e596a506c7e9db564c187b9709b55b33b96881dfb6dd5");
content.copy_persistent_task(task_id, &to).await.unwrap();
assert!(to.exists());
}
#[tokio::test]
async fn test_delete_persistent_task() {
let temp_dir = tempdir().unwrap();
let config = Arc::new(Config::default());
let content = Content::new(config, temp_dir.path()).await.unwrap();
let task_id = "17430ba545c3ce82790e9c9f77e64dca44bb6d6a0c9e18be175037c16c73713d";
let task_path = content.create_persistent_task(task_id, 0).await.unwrap();
assert!(task_path.exists());
content.delete_persistent_task(task_id).await.unwrap();
assert!(!task_path.exists());
}
#[tokio::test]
async fn test_read_persistent_piece() {
let temp_dir = tempdir().unwrap();
let config = Arc::new(Config::default());
let content = Content::new(config, temp_dir.path()).await.unwrap();
let task_id = "9cb27a4af09aee4eb9f904170217659683f4a0ea7cd55e1a9fbcb99ddced659a";
content.create_persistent_task(task_id, 13).await.unwrap();
let data = b"hello, world!";
let mut reader = Cursor::new(data);
content
.write_persistent_piece(task_id, 0, 13, &mut reader)
.await
.unwrap();
let mut reader = content
.read_persistent_piece(task_id, 0, 13, None)
.await
.unwrap();
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await.unwrap();
assert_eq!(buffer, data);
let mut reader = content
.read_persistent_piece(
task_id,
0,
13,
Some(Range {
start: 0,
length: 5,
}),
)
.await
.unwrap();
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await.unwrap();
assert_eq!(buffer, b"hello");
}
#[tokio::test]
async fn test_write_persistent_piece() {
let temp_dir = tempdir().unwrap();
let config = Arc::new(Config::default());
let content = Content::new(config, temp_dir.path()).await.unwrap();
let task_id = "ca1afaf856e8a667fbd48093ca3ca1b8eeb4bf735912fbe551676bc5817a720a";
content.create_persistent_task(task_id, 4).await.unwrap();
let data = b"test";
let mut reader = Cursor::new(data);
let response = content
.write_persistent_piece(task_id, 0, 4, &mut reader)
.await
.unwrap();
assert_eq!(response.length, 4);
assert!(!response.hash.is_empty());
}
#[tokio::test]
async fn test_create_persistent_cache_task() {
let temp_dir = tempdir().unwrap();
let config = Arc::new(Config::default());
let content = Content::new(config, temp_dir.path()).await.unwrap();
let task_id = "c4f108ab1d2b8cfdffe89ea9676af35123fa02e3c25167d62538f630d5d44745";
let task_path = content
.create_persistent_cache_task(task_id, 0)
.await
.unwrap();
assert!(task_path.exists());
assert_eq!(task_path, temp_dir.path().join("content/persistent-cache-tasks/c4f/c4f108ab1d2b8cfdffe89ea9676af35123fa02e3c25167d62538f630d5d44745"));
let task_path_exists = content
.create_persistent_cache_task(task_id, 0)
.await
.unwrap();
assert_eq!(task_path, task_path_exists);
}
#[tokio::test]
async fn test_hard_link_persistent_cache_task() {
let temp_dir = tempdir().unwrap();
let config = Arc::new(Config::default());
let content = Content::new(config, temp_dir.path()).await.unwrap();
let task_id = "5e81970eb2b048910cc84cab026b951f2ceac0a09c72c0717193bb6e466e11cd";
content
.create_persistent_cache_task(task_id, 0)
.await
.unwrap();
let to = temp_dir
.path()
.join("5e81970eb2b048910cc84cab026b951f2ceac0a09c72c0717193bb6e466e11cd");
content
.hard_link_persistent_cache_task(task_id, &to)
.await
.unwrap();
assert!(to.exists());
content
.hard_link_persistent_cache_task(task_id, &to)
.await
.unwrap();
}
#[tokio::test]
async fn test_copy_persistent_cache_task() {
let temp_dir = tempdir().unwrap();
let config = Arc::new(Config::default());
let content = Content::new(config, temp_dir.path()).await.unwrap();
let task_id = "194b9c2018429689fb4e596a506c7e9db564c187b9709b55b33b96881dfb6dd5";
content
.create_persistent_cache_task(task_id, 64)
.await
.unwrap();
let to = temp_dir
.path()
.join("194b9c2018429689fb4e596a506c7e9db564c187b9709b55b33b96881dfb6dd5");
content
.copy_persistent_cache_task(task_id, &to)
.await
.unwrap();
assert!(to.exists());
}
#[tokio::test]
async fn test_delete_persistent_cache_task() {
let temp_dir = tempdir().unwrap();
let config = Arc::new(Config::default());
let content = Content::new(config, temp_dir.path()).await.unwrap();
let task_id = "17430ba545c3ce82790e9c9f77e64dca44bb6d6a0c9e18be175037c16c73713d";
let task_path = content
.create_persistent_cache_task(task_id, 0)
.await
.unwrap();
assert!(task_path.exists());
content.delete_persistent_cache_task(task_id).await.unwrap();
assert!(!task_path.exists());
}
#[tokio::test]
async fn test_read_persistent_cache_piece() {
let temp_dir = tempdir().unwrap();
let config = Arc::new(Config::default());
let content = Content::new(config, temp_dir.path()).await.unwrap();
let task_id = "9cb27a4af09aee4eb9f904170217659683f4a0ea7cd55e1a9fbcb99ddced659a";
content
.create_persistent_cache_task(task_id, 13)
.await
.unwrap();
let data = b"hello, world!";
let mut reader = Cursor::new(data);
content
.write_persistent_cache_piece(task_id, 0, 13, &mut reader)
.await
.unwrap();
let mut reader = content
.read_persistent_cache_piece(task_id, 0, 13, None)
.await
.unwrap();
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await.unwrap();
assert_eq!(buffer, data);
let mut reader = content
.read_persistent_cache_piece(
task_id,
0,
13,
Some(Range {
start: 0,
length: 5,
}),
)
.await
.unwrap();
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await.unwrap();
assert_eq!(buffer, b"hello");
}
#[tokio::test]
async fn test_write_persistent_cache_piece() {
let temp_dir = tempdir().unwrap();
let config = Arc::new(Config::default());
let content = Content::new(config, temp_dir.path()).await.unwrap();
let task_id = "ca1afaf856e8a667fbd48093ca3ca1b8eeb4bf735912fbe551676bc5817a720a";
content
.create_persistent_cache_task(task_id, 4)
.await
.unwrap();
let data = b"test";
let mut reader = Cursor::new(data);
let response = content
.write_persistent_cache_piece(task_id, 0, 4, &mut reader)
.await
.unwrap();
assert_eq!(response.length, 4);
assert!(!response.hash.is_empty());
}
#[tokio::test]
async fn test_has_enough_space() {
let config = Arc::new(Config::default());
let temp_dir = tempdir().unwrap();
let content = Content::new(config, temp_dir.path()).await.unwrap();
let has_space = content.has_enough_space(1).unwrap();
assert!(has_space);
let has_space = content.has_enough_space(u64::MAX).unwrap();
assert!(!has_space);
let mut config = Config::default();
config.gc.policy.disk_threshold = ByteSize::mib(10);
let config = Arc::new(config);
let content = Content::new(config, temp_dir.path()).await.unwrap();
let file_path = Path::new(temp_dir.path())
.join(content::DEFAULT_CONTENT_DIR)
.join(content::DEFAULT_TASK_DIR)
.join("1mib");
let mut file = File::create(&file_path).await.unwrap();
let buffer = vec![0u8; ByteSize::mib(1).as_u64() as usize];
file.write_all(&buffer).await.unwrap();
file.flush().await.unwrap();
let has_space = content
.has_enough_space(ByteSize::mib(9).as_u64() + 1)
.unwrap();
assert!(!has_space);
let has_space = content.has_enough_space(ByteSize::mib(9).as_u64()).unwrap();
assert!(has_space);
}
}