mod file;
use std::{
cmp::min,
fs::{self, File},
path::{Path, PathBuf},
};
use cloud_filter::{
error::{CResult, CloudErrorKind},
filter::{info, ticket, Filter, Request},
metadata::Metadata,
placeholder::{ConvertOptions, Placeholder},
placeholder_file::PlaceholderFile,
utility::{FileTime, WriteAt},
};
use file::FileBlob;
use futures::StreamExt;
use opendal::{Entry, Operator};
const BUF_SIZE: usize = 65536;
pub struct CloudFilter {
op: Operator,
root: PathBuf,
}
impl CloudFilter {
pub fn new(op: Operator, root: PathBuf) -> Self {
Self { op, root }
}
}
impl Filter for CloudFilter {
async fn fetch_data(
&self,
request: Request,
ticket: ticket::FetchData,
info: info::FetchData,
) -> CResult<()> {
log::debug!("fetch_data: {}", request.path().display());
let _blob = bincode::deserialize::<FileBlob>(request.file_blob()).map_err(|e| {
log::warn!("failed to deserialize file blob: {e}");
CloudErrorKind::ValidationFailed
})?;
let range = info.required_file_range();
let path = request.path();
let remote_path = path
.strip_prefix(&self.root)
.map_err(|_| CloudErrorKind::NotUnderSyncRoot)?;
let reader = self
.op
.reader_with(&remote_path.to_string_lossy().replace('\\', "/"))
.await
.map_err(|e| {
log::warn!("failed to open file: {e}");
CloudErrorKind::Unsuccessful
})?;
let mut position = range.start;
let mut buffer = Vec::with_capacity(BUF_SIZE);
loop {
let mut bytes_read = reader
.read_into(
&mut buffer,
position..min(range.end, position + BUF_SIZE as u64),
)
.await
.map_err(|e| {
log::warn!("failed to read file: {e}");
CloudErrorKind::Unsuccessful
})?;
let unaligned = bytes_read % 4096;
if unaligned != 0 && position + (bytes_read as u64) < range.end {
bytes_read -= unaligned;
}
ticket
.write_at(&buffer[..bytes_read], position)
.map_err(|e| {
log::warn!("failed to write file: {e}");
CloudErrorKind::Unsuccessful
})?;
position += bytes_read as u64;
if position >= range.end {
break;
}
buffer.clear();
ticket.report_progress(range.end, position).map_err(|e| {
log::warn!("failed to report progress: {e}");
CloudErrorKind::Unsuccessful
})?;
}
Ok(())
}
async fn fetch_placeholders(
&self,
request: Request,
ticket: ticket::FetchPlaceholders,
_info: info::FetchPlaceholders,
) -> CResult<()> {
log::debug!("fetch_placeholders: {}", request.path().display());
let absolute = request.path();
let mut remote_path = absolute
.strip_prefix(&self.root)
.map_err(|_| CloudErrorKind::NotUnderSyncRoot)?
.to_owned();
remote_path.push("");
let now = FileTime::now();
let mut entries = self
.op
.lister_with(&remote_path.to_string_lossy().replace('\\', "/"))
.await
.map_err(|e| {
log::warn!("failed to list files: {e}");
CloudErrorKind::Unsuccessful
})?
.filter_map(|e| async {
let entry = e.ok()?;
let metadata = self.op.stat(entry.path()).await.ok()?;
let entry_remote_path = PathBuf::from(entry.path());
let relative_path = entry_remote_path
.strip_prefix(&remote_path)
.expect("valid path");
check_in_sync(&entry, &self.root).then(|| {
PlaceholderFile::new(relative_path)
.metadata(
match entry.metadata().is_dir() {
true => Metadata::directory(),
false => Metadata::file(),
}
.size(metadata.content_length())
.written(
FileTime::from_unix_time(
metadata.last_modified().unwrap_or_default().timestamp(),
)
.expect("valid time"),
)
.created(now),
)
.mark_in_sync()
.blob(
bincode::serialize(&FileBlob {
..Default::default()
})
.expect("valid blob"),
)
})
})
.collect::<Vec<_>>()
.await;
_ = ticket.pass_with_placeholder(&mut entries).map_err(|e| {
log::warn!("failed to pass placeholder: {e:?}");
});
Ok(())
}
}
fn check_in_sync(entry: &Entry, root: &Path) -> bool {
let absolute = root.join(entry.path());
let Ok(metadata) = fs::metadata(&absolute) else {
return true;
};
if metadata.is_dir() != entry.metadata().is_dir() {
return false;
} else if metadata.is_file() {
if entry.metadata().content_length() != metadata.len() {
return false;
}
}
if metadata.is_dir() {
let mut placeholder = Placeholder::open(absolute).unwrap();
_ = placeholder
.convert_to_placeholder(
ConvertOptions::default()
.mark_in_sync()
.has_children()
.blob(
bincode::serialize(&FileBlob {
..Default::default()
})
.expect("valid blob"),
),
None,
)
.map_err(|e| {
log::error!("failed to convert to placeholder: {e:?}");
});
} else {
let mut placeholder = Placeholder::from(File::open(absolute).unwrap());
_ = placeholder
.convert_to_placeholder(
ConvertOptions::default().mark_in_sync().blob(
bincode::serialize(&FileBlob {
..Default::default()
})
.expect("valid blob"),
),
None,
)
.map_err(|e| log::error!("failed to convert to placeholder: {e:?}"));
}
false
}