use std::cmp;
use std::io::{Read, Write};
use std::time::Duration;
use crate::error::{Error, Result};
use crate::retry::RetryPolicy;
use crate::rpc::{AuthSys, max_record_size_for_payloads};
use crate::v3::client::{
advance_offset, append_dir_entries, clamp_dir_size, clamp_io_size, dir_page_from_batch,
join_path, normalize_path, temporary_sibling_path, validate_max_dir_entries,
validate_transfer_size,
};
use crate::v3::mount::{MOUNT_PROGRAM, MOUNT_VERSION, MountClient};
use crate::v3::portmap;
use crate::v3::proto::{
AccessResult, CommitResult, CreateHow, FileAttr, FileHandle, FileType, FsInfo, FsStat,
MAX_DIR_ENTRIES, NFS_PORT, NFS_PROGRAM, NFS_VERSION, Nfs3Client, NfsStatus, NfsTime, PathConf,
SetAttr, StableHow,
};
pub use crate::v3::client::{DirEntry, DirPage, DirPageCursor, RemoteTarget};
const DEFAULT_IO_SIZE: u32 = 128 * 1024;
const DEFAULT_DIR_SIZE: u32 = 128 * 1024;
#[derive(Debug, Clone)]
pub struct ClientBuilder {
target: RemoteTarget,
auth: AuthSys,
timeout: Option<Duration>,
mount_port: Option<u16>,
nfs_port: Option<u16>,
read_size_limit: u32,
write_size_limit: u32,
dir_size_limit: u32,
max_dir_entries: usize,
retry_policy: RetryPolicy,
}
impl ClientBuilder {
pub fn new(host: impl Into<String>, export: impl Into<String>) -> Self {
Self {
target: RemoteTarget {
host: host.into(),
export: export.into(),
},
auth: AuthSys::current(),
timeout: Some(Duration::from_secs(30)),
mount_port: None,
nfs_port: None,
read_size_limit: DEFAULT_IO_SIZE,
write_size_limit: DEFAULT_IO_SIZE,
dir_size_limit: DEFAULT_DIR_SIZE,
max_dir_entries: MAX_DIR_ENTRIES,
retry_policy: RetryPolicy::default(),
}
}
pub fn from_target(target: &str) -> Result<Self> {
Ok(Self {
target: RemoteTarget::parse(target)?,
auth: AuthSys::current(),
timeout: Some(Duration::from_secs(30)),
mount_port: None,
nfs_port: None,
read_size_limit: DEFAULT_IO_SIZE,
write_size_limit: DEFAULT_IO_SIZE,
dir_size_limit: DEFAULT_DIR_SIZE,
max_dir_entries: MAX_DIR_ENTRIES,
retry_policy: RetryPolicy::default(),
})
}
pub fn auth_sys(mut self, auth: AuthSys) -> Self {
self.auth = auth;
self
}
pub fn timeout(mut self, timeout: Option<Duration>) -> Self {
self.timeout = timeout;
self
}
pub fn mount_port(mut self, port: u16) -> Self {
self.mount_port = Some(port);
self
}
pub fn nfs_port(mut self, port: u16) -> Self {
self.nfs_port = Some(port);
self
}
pub fn io_size(mut self, size: u32) -> Self {
self.read_size_limit = size;
self.write_size_limit = size;
self
}
pub fn read_size(mut self, size: u32) -> Self {
self.read_size_limit = size;
self
}
pub fn write_size(mut self, size: u32) -> Self {
self.write_size_limit = size;
self
}
pub fn dir_size(mut self, size: u32) -> Self {
self.dir_size_limit = size;
self
}
pub fn max_dir_entries(mut self, max_dir_entries: usize) -> Self {
self.max_dir_entries = max_dir_entries;
self
}
pub fn retry_policy(mut self, retry_policy: RetryPolicy) -> Self {
self.retry_policy = retry_policy;
self
}
pub fn connect(self) -> Result<Client> {
Client::connect_with_builder(self)
}
}
#[derive(Debug)]
pub struct Client {
builder: ClientBuilder,
target: RemoteTarget,
auth: AuthSys,
mount_port: u16,
root: FileHandle,
nfs: Nfs3Client,
fsinfo: Option<FsInfo>,
read_size: u32,
write_size: u32,
dir_size: u32,
max_dir_entries: usize,
}
impl Client {
pub fn connect(target: &str) -> Result<Self> {
ClientBuilder::from_target(target)?.connect()
}
pub fn builder(host: impl Into<String>, export: impl Into<String>) -> ClientBuilder {
ClientBuilder::new(host, export)
}
pub fn fsinfo(&self) -> Option<&FsInfo> {
self.fsinfo.as_ref()
}
pub fn reconnect(&mut self) -> Result<()> {
*self = Self::connect_with_builder(self.builder.clone())?;
Ok(())
}
pub fn fsstat(&mut self, path: &str) -> Result<FsStat> {
let handle = self.resolve(path)?;
self.nfs.fsstat(&handle)
}
pub fn pathconf(&mut self, path: &str) -> Result<PathConf> {
let handle = self.resolve(path)?;
self.nfs.pathconf(&handle)
}
pub fn metadata(&mut self, path: &str) -> Result<FileAttr> {
let handle = self.resolve(path)?;
self.nfs.getattr(&handle)
}
pub fn file_type(&mut self, path: &str) -> Result<FileType> {
Ok(self.metadata(path)?.file_type)
}
pub fn is_file(&mut self, path: &str) -> Result<bool> {
Ok(self.file_type(path)?.is_file())
}
pub fn is_dir(&mut self, path: &str) -> Result<bool> {
Ok(self.file_type(path)?.is_dir())
}
pub fn is_symlink(&mut self, path: &str) -> Result<bool> {
Ok(self.file_type(path)?.is_symlink())
}
pub fn access(&mut self, path: &str, access: u32) -> Result<AccessResult> {
let handle = self.resolve(path)?;
self.nfs.access(&handle, access)
}
pub fn lookup(&mut self, path: &str) -> Result<()> {
self.resolve(path).map(|_| ())
}
pub fn exists(&mut self, path: &str) -> Result<bool> {
match self.resolve(path) {
Ok(_) => Ok(true),
Err(Error::Nfs {
status: NfsStatus::NoEnt,
..
}) => Ok(false),
Err(err) => Err(err),
}
}
pub fn read(&mut self, path: &str) -> Result<Vec<u8>> {
let handle = self.resolve(path)?;
self.read_handle_to_end(&handle)
}
pub fn read_to_writer<W: Write + ?Sized>(&mut self, path: &str, writer: &mut W) -> Result<u64> {
let handle = self.resolve(path)?;
self.read_handle_to_writer(&handle, writer)
}
pub fn read_range_to_writer<W: Write + ?Sized>(
&mut self,
path: &str,
offset: u64,
count: u64,
writer: &mut W,
) -> Result<u64> {
let handle = self.resolve(path)?;
self.read_handle_range_to_writer(&handle, offset, count, writer)
}
pub fn read_range(&mut self, path: &str, offset: u64, count: u64) -> Result<Vec<u8>> {
let handle = self.resolve(path)?;
self.read_handle_range(&handle, offset, count)
}
pub fn read_at(&mut self, path: &str, offset: u64, count: u32) -> Result<Vec<u8>> {
let handle = self.resolve(path)?;
self.read_handle_at(&handle, offset, count)
}
pub fn read_exact_at(&mut self, path: &str, offset: u64, count: u32) -> Result<Vec<u8>> {
let data = self.read_at(path, offset, count)?;
if data.len() != count as usize {
return Err(Error::Protocol(format!(
"READ returned {} bytes before EOF; expected {count}",
data.len()
)));
}
Ok(data)
}
pub fn read_link(&mut self, path: &str) -> Result<String> {
let handle = self.resolve(path)?;
self.nfs.readlink(&handle)
}
pub fn write(&mut self, path: &str, data: &[u8]) -> Result<()> {
self.write_with_stability(path, data, StableHow::FileSync)
}
pub fn write_with_stability(
&mut self,
path: &str,
data: &[u8],
stable: StableHow,
) -> Result<()> {
let handle = match self.resolve(path) {
Ok(handle) => handle,
Err(Error::Nfs {
status: NfsStatus::NoEnt,
..
}) => {
let (parent, name) = self.resolve_parent(path)?;
self.nfs
.create(
&parent,
&name,
&CreateHow::Unchecked(SetAttr {
mode: Some(0o644),
..SetAttr::default()
}),
)?
.object
.ok_or_else(|| {
Error::Protocol("CREATE succeeded without returning a file handle".into())
})?
}
Err(err) => return Err(err),
};
self.nfs.setattr(&handle, &SetAttr::size(0))?;
self.write_handle_all(&handle, 0, data, stable)
}
pub fn write_from_reader<R: Read + ?Sized>(
&mut self,
path: &str,
reader: &mut R,
) -> Result<u64> {
self.write_from_reader_with_stability(path, reader, StableHow::FileSync)
}
pub fn write_from_reader_with_stability<R: Read + ?Sized>(
&mut self,
path: &str,
reader: &mut R,
stable: StableHow,
) -> Result<u64> {
let handle = match self.resolve(path) {
Ok(handle) => handle,
Err(Error::Nfs {
status: NfsStatus::NoEnt,
..
}) => {
let (parent, name) = self.resolve_parent(path)?;
self.nfs
.create(
&parent,
&name,
&CreateHow::Unchecked(SetAttr {
mode: Some(0o644),
..SetAttr::default()
}),
)?
.object
.ok_or_else(|| {
Error::Protocol("CREATE succeeded without returning a file handle".into())
})?
}
Err(err) => return Err(err),
};
self.nfs.setattr(&handle, &SetAttr::size(0))?;
self.write_handle_from_reader(&handle, reader, stable)
}
pub fn write_atomic(&mut self, path: &str, data: &[u8]) -> Result<()> {
self.write_atomic_with_mode_and_stability(path, data, 0o644, StableHow::FileSync)
}
pub fn write_atomic_with_mode(&mut self, path: &str, data: &[u8], mode: u32) -> Result<()> {
self.write_atomic_with_mode_and_stability(path, data, mode, StableHow::FileSync)
}
pub fn write_atomic_with_stability(
&mut self,
path: &str,
data: &[u8],
stable: StableHow,
) -> Result<()> {
self.write_atomic_with_mode_and_stability(path, data, 0o644, stable)
}
pub fn write_atomic_with_mode_and_stability(
&mut self,
path: &str,
data: &[u8],
mode: u32,
stable: StableHow,
) -> Result<()> {
let temp = temporary_sibling_path(path)?;
let mut created = false;
let result = (|| {
let handle = self.create_handle_new(&temp, mode)?;
created = true;
self.write_handle_all(&handle, 0, data, stable)?;
self.rename(&temp, path)
})();
if result.is_err() && created {
let _ = self.remove(&temp);
}
result
}
pub fn write_atomic_from_reader<R: Read + ?Sized>(
&mut self,
path: &str,
reader: &mut R,
) -> Result<u64> {
self.write_atomic_from_reader_with_mode_and_stability(
path,
reader,
0o644,
StableHow::FileSync,
)
}
pub fn write_atomic_from_reader_with_mode<R: Read + ?Sized>(
&mut self,
path: &str,
reader: &mut R,
mode: u32,
) -> Result<u64> {
self.write_atomic_from_reader_with_mode_and_stability(
path,
reader,
mode,
StableHow::FileSync,
)
}
pub fn write_atomic_from_reader_with_stability<R: Read + ?Sized>(
&mut self,
path: &str,
reader: &mut R,
stable: StableHow,
) -> Result<u64> {
self.write_atomic_from_reader_with_mode_and_stability(path, reader, 0o644, stable)
}
pub fn write_atomic_from_reader_with_mode_and_stability<R: Read + ?Sized>(
&mut self,
path: &str,
reader: &mut R,
mode: u32,
stable: StableHow,
) -> Result<u64> {
let temp = temporary_sibling_path(path)?;
let mut created = false;
let result = (|| {
let handle = self.create_handle_new(&temp, mode)?;
created = true;
let written = self.write_handle_from_reader(&handle, reader, stable)?;
self.rename(&temp, path)?;
Ok(written)
})();
if result.is_err() && created {
let _ = self.remove(&temp);
}
result
}
pub fn append(&mut self, path: &str, data: &[u8]) -> Result<u64> {
self.append_with_stability(path, data, StableHow::FileSync)
}
pub fn append_with_stability(
&mut self,
path: &str,
data: &[u8],
stable: StableHow,
) -> Result<u64> {
let handle = self.resolve(path)?;
let attr = self.nfs.getattr(&handle)?;
self.write_handle_all(&handle, attr.size, data, stable)?;
let mut written = 0;
advance_offset(&mut written, data.len(), "APPEND")?;
Ok(written)
}
pub fn append_from_reader<R: Read + ?Sized>(
&mut self,
path: &str,
reader: &mut R,
) -> Result<u64> {
self.append_from_reader_with_stability(path, reader, StableHow::FileSync)
}
pub fn append_from_reader_with_stability<R: Read + ?Sized>(
&mut self,
path: &str,
reader: &mut R,
stable: StableHow,
) -> Result<u64> {
let handle = self.resolve(path)?;
let attr = self.nfs.getattr(&handle)?;
self.write_handle_from_reader_at(&handle, attr.size, reader, stable)
}
pub fn truncate(&mut self, path: &str, size: u64) -> Result<()> {
let handle = self.resolve(path)?;
self.nfs.setattr(&handle, &SetAttr::size(size))?;
Ok(())
}
pub fn setattr(&mut self, path: &str, attr: &SetAttr) -> Result<()> {
let handle = self.resolve(path)?;
self.nfs.setattr(&handle, attr)?;
Ok(())
}
pub fn set_mode(&mut self, path: &str, mode: u32) -> Result<()> {
self.setattr(path, &SetAttr::mode(mode))
}
pub fn set_uid(&mut self, path: &str, uid: u32) -> Result<()> {
self.setattr(path, &SetAttr::uid(uid))
}
pub fn set_gid(&mut self, path: &str, gid: u32) -> Result<()> {
self.setattr(path, &SetAttr::gid(gid))
}
pub fn set_ownership(&mut self, path: &str, uid: u32, gid: u32) -> Result<()> {
self.setattr(path, &SetAttr::ownership(uid, gid))
}
pub fn set_times(
&mut self,
path: &str,
atime: Option<NfsTime>,
mtime: Option<NfsTime>,
) -> Result<()> {
self.setattr(path, &SetAttr::times(atime, mtime))
}
pub fn touch(&mut self, path: &str) -> Result<()> {
self.setattr(path, &SetAttr::touch())
}
pub fn write_at(&mut self, path: &str, offset: u64, data: &[u8]) -> Result<()> {
self.write_at_with_stability(path, offset, data, StableHow::FileSync)
}
pub fn write_at_with_stability(
&mut self,
path: &str,
offset: u64,
data: &[u8],
stable: StableHow,
) -> Result<()> {
let handle = self.resolve(path)?;
self.write_handle_all(&handle, offset, data, stable)
}
pub fn copy(&mut self, from: &str, to: &str) -> Result<u64> {
self.copy_with_stability(from, to, StableHow::FileSync)
}
pub fn copy_with_stability(&mut self, from: &str, to: &str, stable: StableHow) -> Result<u64> {
if normalize_path(from)? == normalize_path(to)? {
return Err(Error::Protocol(
"copy source and destination must differ".to_owned(),
));
}
let source = self.resolve(from)?;
let source_attr = self.nfs.getattr(&source)?;
let target = match self.resolve(to) {
Ok(handle) => handle,
Err(Error::Nfs {
status: NfsStatus::NoEnt,
..
}) => {
let (parent, name) = self.resolve_parent(to)?;
self.nfs
.create(
&parent,
&name,
&CreateHow::Unchecked(SetAttr {
mode: Some(source_attr.mode & 0o7777),
..SetAttr::default()
}),
)?
.object
.ok_or_else(|| {
Error::Protocol("CREATE succeeded without returning a file handle".into())
})?
}
Err(err) => return Err(err),
};
self.nfs.setattr(&target, &SetAttr::size(0))?;
self.copy_handles(&source, &target, stable)
}
pub fn copy_atomic(&mut self, from: &str, to: &str) -> Result<u64> {
self.copy_atomic_with_stability(from, to, StableHow::FileSync)
}
pub fn copy_atomic_with_stability(
&mut self,
from: &str,
to: &str,
stable: StableHow,
) -> Result<u64> {
if normalize_path(from)? == normalize_path(to)? {
return Err(Error::Protocol(
"copy source and destination must differ".to_owned(),
));
}
let source = self.resolve(from)?;
let source_attr = self.nfs.getattr(&source)?;
let temp = temporary_sibling_path(to)?;
let mut created = false;
let result = (|| {
let target = self.create_handle_new(&temp, source_attr.mode & 0o7777)?;
created = true;
let copied = self.copy_handles(&source, &target, stable)?;
self.rename(&temp, to)?;
Ok(copied)
})();
if result.is_err() && created {
let _ = self.remove(&temp);
}
result
}
pub fn commit(&mut self, path: &str, offset: u64, count: u32) -> Result<CommitResult> {
let handle = self.resolve(path)?;
self.nfs.commit(&handle, offset, count)
}
pub fn create(&mut self, path: &str, mode: u32) -> Result<()> {
self.create_new(path, mode)
}
pub fn create_new(&mut self, path: &str, mode: u32) -> Result<()> {
self.create_handle_new(path, mode).map(|_| ())
}
pub fn create_unchecked(&mut self, path: &str, mode: u32) -> Result<()> {
self.create_handle_with_how(
path,
CreateHow::Unchecked(SetAttr {
mode: Some(mode),
..SetAttr::default()
}),
)
.map(|_| ())
}
fn create_handle_new(&mut self, path: &str, mode: u32) -> Result<FileHandle> {
self.create_handle_with_how(path, CreateHow::Guarded(SetAttr::mode(mode)))
}
fn create_handle_with_how(&mut self, path: &str, how: CreateHow) -> Result<FileHandle> {
let (parent, name) = self.resolve_parent(path)?;
let result = self.nfs.create(&parent, &name, &how)?;
result
.object
.ok_or_else(|| Error::Protocol("CREATE succeeded without a file handle".into()))
}
pub fn mkdir(&mut self, path: &str, mode: u32) -> Result<()> {
let (parent, name) = self.resolve_parent(path)?;
self.nfs.mkdir(&parent, &name, &SetAttr::mode(mode))?;
Ok(())
}
pub fn create_dir_all(&mut self, path: &str, mode: u32) -> Result<()> {
let components = normalize_path(path)?;
let mut current = self.root.clone();
let mut current_path = String::from("/");
for component in components {
current_path = join_path(¤t_path, component);
match self.nfs.lookup(¤t, component) {
Ok(lookup) => {
self.ensure_directory(¤t_path, &lookup.object, lookup.object_attributes)?;
current = lookup.object;
}
Err(Error::Nfs {
status: NfsStatus::NoEnt,
..
}) => match self.nfs.mkdir(¤t, component, &SetAttr::mode(mode)) {
Ok(created) => {
current = match created.object {
Some(handle) => handle,
None => self.nfs.lookup(¤t, component)?.object,
};
}
Err(Error::Nfs {
status: NfsStatus::Exist,
..
}) => {
let lookup = self.nfs.lookup(¤t, component)?;
self.ensure_directory(
¤t_path,
&lookup.object,
lookup.object_attributes,
)?;
current = lookup.object;
}
Err(err) => return Err(err),
},
Err(err) => return Err(err),
}
}
Ok(())
}
pub fn symlink(&mut self, path: &str, target: &str) -> Result<()> {
let (parent, name) = self.resolve_parent(path)?;
self.nfs
.symlink(&parent, &name, target, &SetAttr::default())?;
Ok(())
}
pub fn hard_link(&mut self, existing: &str, link: &str) -> Result<()> {
let file = self.resolve(existing)?;
let (link_dir, link_name) = self.resolve_parent(link)?;
self.nfs.link(&file, &link_dir, &link_name)?;
Ok(())
}
pub fn remove(&mut self, path: &str) -> Result<()> {
let (parent, name) = self.resolve_parent(path)?;
self.nfs.remove(&parent, &name)?;
Ok(())
}
pub fn remove_if_exists(&mut self, path: &str) -> Result<bool> {
match self.remove(path) {
Ok(()) => Ok(true),
Err(err) if err.is_not_found() => Ok(false),
Err(err) => Err(err),
}
}
pub fn rmdir(&mut self, path: &str) -> Result<()> {
let (parent, name) = self.resolve_parent(path)?;
self.nfs.rmdir(&parent, &name)?;
Ok(())
}
pub fn rmdir_if_exists(&mut self, path: &str) -> Result<bool> {
match self.rmdir(path) {
Ok(()) => Ok(true),
Err(err) if err.is_not_found() => Ok(false),
Err(err) => Err(err),
}
}
pub fn remove_all(&mut self, path: &str) -> Result<()> {
if normalize_path(path)?.is_empty() {
return Err(Error::InvalidPath(path.to_owned()));
}
enum RemoveTask {
Visit(String, Option<FileType>),
RemoveDir(String),
}
let file_type = self.metadata(path)?.file_type;
let mut stack = vec![RemoveTask::Visit(path.to_owned(), Some(file_type))];
while let Some(task) = stack.pop() {
match task {
RemoveTask::Visit(path, file_type) => {
let file_type = match file_type {
Some(file_type) => file_type,
None => self.metadata(&path)?.file_type,
};
if file_type == FileType::Directory {
stack.push(RemoveTask::RemoveDir(path.clone()));
let entries = self.read_dir(&path)?;
for entry in entries.into_iter().rev() {
if entry.name == "." || entry.name == ".." {
continue;
}
let child = join_path(&path, &entry.name);
let child_type = entry.attributes.map(|attr| attr.file_type);
stack.push(RemoveTask::Visit(child, child_type));
}
} else {
self.remove(&path)?;
}
}
RemoveTask::RemoveDir(path) => self.rmdir(&path)?,
}
}
Ok(())
}
pub fn remove_all_if_exists(&mut self, path: &str) -> Result<bool> {
match self.remove_all(path) {
Ok(()) => Ok(true),
Err(err) if err.is_not_found() => Ok(false),
Err(err) => Err(err),
}
}
pub fn rename(&mut self, from: &str, to: &str) -> Result<()> {
let (from_parent, from_name) = self.resolve_parent(from)?;
let (to_parent, to_name) = self.resolve_parent(to)?;
self.nfs
.rename(&from_parent, &from_name, &to_parent, &to_name)?;
Ok(())
}
pub fn read_dir(&mut self, path: &str) -> Result<Vec<DirEntry>> {
self.read_dir_with_limit(path, self.max_dir_entries)
}
pub fn read_dir_limited(&mut self, path: &str, max_entries: usize) -> Result<Vec<DirEntry>> {
validate_max_dir_entries(max_entries)?;
self.read_dir_with_limit(path, max_entries.min(self.max_dir_entries))
}
pub fn read_dir_page(&mut self, path: &str, cursor: Option<DirPageCursor>) -> Result<DirPage> {
self.read_dir_page_limited(path, cursor, self.max_dir_entries)
}
pub fn read_dir_page_limited(
&mut self,
path: &str,
cursor: Option<DirPageCursor>,
max_entries: usize,
) -> Result<DirPage> {
validate_max_dir_entries(max_entries)?;
let max_entries = max_entries.min(self.max_dir_entries);
let dir = self.resolve(path)?;
let cursor = cursor.unwrap_or_default();
let batch = match self.nfs.readdirplus(
&dir,
cursor.cookie,
cursor.cookieverf,
self.dir_size,
self.dir_size,
) {
Ok(batch) => batch,
Err(Error::Nfs {
status: NfsStatus::NotSupported,
..
}) => self
.nfs
.readdir(&dir, cursor.cookie, cursor.cookieverf, self.dir_size)?,
Err(err) => return Err(err),
};
dir_page_from_batch(&batch, max_entries)
}
fn read_dir_with_limit(&mut self, path: &str, max_entries: usize) -> Result<Vec<DirEntry>> {
let dir = self.resolve(path)?;
let mut entries = Vec::new();
let mut cookie = 0;
let mut cookieverf = [0; 8];
loop {
let batch =
match self
.nfs
.readdirplus(&dir, cookie, cookieverf, self.dir_size, self.dir_size)
{
Ok(batch) => batch,
Err(Error::Nfs {
status: NfsStatus::NotSupported,
..
}) => self.nfs.readdir(&dir, cookie, cookieverf, self.dir_size)?,
Err(err) => return Err(err),
};
append_dir_entries(&mut entries, &batch, max_entries)?;
if batch.eof {
break;
}
if let Some(last) = batch.entries.last() {
cookie = last.cookie;
cookieverf = batch.cookieverf;
} else {
return Err(Error::Protocol(
"READDIR returned no entries before EOF".to_owned(),
));
}
}
Ok(entries)
}
pub fn unmount(self) -> Result<()> {
let mut mount = MountClient::connect_with_timeout(
(self.target.host.as_str(), self.mount_port),
self.auth.clone(),
self.builder.timeout,
)?;
mount.set_timeout(self.builder.timeout)?;
mount.unmount(&self.target.export)
}
fn connect_with_builder(builder: ClientBuilder) -> Result<Self> {
if !builder.target.export.starts_with('/') {
return Err(Error::InvalidTarget(format!(
"{}:{}",
builder.target.host, builder.target.export
)));
}
let read_size_limit = validate_transfer_size("read_size", builder.read_size_limit)?;
let write_size_limit = validate_transfer_size("write_size", builder.write_size_limit)?;
let dir_size_limit = validate_transfer_size("dir_size", builder.dir_size_limit)?;
validate_max_dir_entries(builder.max_dir_entries)?;
let stored_builder = builder.clone();
let mount_port = match builder.mount_port {
Some(port) => port,
None => portmap::get_tcp_port_with_timeout(
&builder.target.host,
MOUNT_PROGRAM,
MOUNT_VERSION,
builder.timeout,
)?,
};
let mut mount = MountClient::connect_with_timeout(
(builder.target.host.as_str(), mount_port),
builder.auth.clone(),
builder.timeout,
)?;
mount.set_timeout(builder.timeout)?;
let mount_info = mount.mount(&builder.target.export)?;
let nfs_port = match builder.nfs_port {
Some(port) => port,
None => portmap::get_tcp_port_with_timeout(
&builder.target.host,
NFS_PROGRAM,
NFS_VERSION,
builder.timeout,
)
.unwrap_or(NFS_PORT),
};
let mut nfs = Nfs3Client::connect_with_timeout(
(builder.target.host.as_str(), nfs_port),
builder.auth.clone(),
builder.timeout,
)?;
nfs.set_timeout(builder.timeout)?;
nfs.set_retry_policy(builder.retry_policy);
let fsinfo = nfs.fsinfo(&mount_info.file_handle).ok();
let read_size = fsinfo
.as_ref()
.map(|info| clamp_io_size(info.read_preferred, info.read_max, read_size_limit))
.unwrap_or(read_size_limit);
let write_size = fsinfo
.as_ref()
.map(|info| clamp_io_size(info.write_preferred, info.write_max, write_size_limit))
.unwrap_or(write_size_limit);
let dir_size = fsinfo
.as_ref()
.map(|info| clamp_dir_size(info.dir_preferred, dir_size_limit))
.unwrap_or(dir_size_limit);
nfs.set_max_record_size(max_record_size_for_payloads(&[
read_size, write_size, dir_size,
]));
Ok(Self {
builder: stored_builder,
target: builder.target,
auth: builder.auth,
mount_port,
root: mount_info.file_handle,
nfs,
fsinfo,
read_size,
write_size,
dir_size,
max_dir_entries: builder.max_dir_entries,
})
}
fn resolve(&mut self, path: &str) -> Result<FileHandle> {
let components = normalize_path(path)?;
let mut current = self.root.clone();
for component in components {
current = self.nfs.lookup(¤t, component)?.object;
}
Ok(current)
}
fn resolve_parent(&mut self, path: &str) -> Result<(FileHandle, String)> {
let mut components = normalize_path(path)?;
let name = components
.pop()
.ok_or_else(|| Error::InvalidPath(path.to_owned()))?
.to_owned();
let mut current = self.root.clone();
for component in components {
current = self.nfs.lookup(¤t, component)?.object;
}
Ok((current, name))
}
fn ensure_directory(
&mut self,
path: &str,
handle: &FileHandle,
attr: Option<FileAttr>,
) -> Result<()> {
let attr = match attr {
Some(attr) => attr,
None => self.nfs.getattr(handle)?,
};
if attr.file_type == FileType::Directory {
Ok(())
} else {
Err(Error::Protocol(format!(
"{path:?} exists but is not a directory"
)))
}
}
fn read_handle_to_end(&mut self, handle: &FileHandle) -> Result<Vec<u8>> {
let mut offset = 0;
let mut out = Vec::new();
loop {
let chunk = self.nfs.read(handle, offset, self.read_size)?;
if chunk.data.len() > self.read_size as usize {
return Err(Error::Protocol(format!(
"READ returned {} bytes for a {} byte request",
chunk.data.len(),
self.read_size
)));
}
if chunk.data.is_empty() {
if chunk.eof {
return Ok(out);
}
return Err(Error::Protocol(
"READ returned no data before EOF".to_owned(),
));
}
advance_offset(&mut offset, chunk.data.len(), "READ")?;
out.extend_from_slice(&chunk.data);
if chunk.eof {
return Ok(out);
}
}
}
fn read_handle_to_writer<W: Write + ?Sized>(
&mut self,
handle: &FileHandle,
writer: &mut W,
) -> Result<u64> {
let mut offset = 0;
let mut total = 0;
loop {
let chunk = self.nfs.read(handle, offset, self.read_size)?;
if chunk.data.len() > self.read_size as usize {
return Err(Error::Protocol(format!(
"READ returned {} bytes for a {} byte request",
chunk.data.len(),
self.read_size
)));
}
if chunk.data.is_empty() {
if chunk.eof {
return Ok(total);
}
return Err(Error::Protocol(
"READ returned no data before EOF".to_owned(),
));
}
writer.write_all(&chunk.data)?;
advance_offset(&mut offset, chunk.data.len(), "READ")?;
advance_offset(&mut total, chunk.data.len(), "READ total")?;
if chunk.eof {
return Ok(total);
}
}
}
fn read_handle_at(&mut self, handle: &FileHandle, offset: u64, count: u32) -> Result<Vec<u8>> {
self.read_handle_range(handle, offset, u64::from(count))
}
fn read_handle_range(
&mut self,
handle: &FileHandle,
offset: u64,
count: u64,
) -> Result<Vec<u8>> {
let capacity = usize::try_from(count).unwrap_or(usize::MAX);
let mut out = Vec::with_capacity(capacity.min(self.read_size as usize));
self.read_handle_range_to_writer(handle, offset, count, &mut out)?;
Ok(out)
}
fn read_handle_range_to_writer<W: Write + ?Sized>(
&mut self,
handle: &FileHandle,
mut offset: u64,
mut remaining: u64,
writer: &mut W,
) -> Result<u64> {
let mut total = 0;
while remaining > 0 {
let request = cmp::min(u64::from(self.read_size), remaining) as u32;
let chunk = self.nfs.read(handle, offset, request)?;
if chunk.data.len() > request as usize {
return Err(Error::Protocol(format!(
"READ returned {} bytes for a {request} byte request",
chunk.data.len()
)));
}
if chunk.data.is_empty() {
if chunk.eof {
return Ok(total);
}
return Err(Error::Protocol(
"READ returned no data before EOF".to_owned(),
));
}
writer.write_all(&chunk.data)?;
advance_offset(&mut offset, chunk.data.len(), "READ")?;
advance_offset(&mut total, chunk.data.len(), "READ total")?;
remaining -= chunk.data.len() as u64;
if chunk.eof {
return Ok(total);
}
}
Ok(total)
}
fn write_handle_all(
&mut self,
handle: &FileHandle,
mut offset: u64,
mut data: &[u8],
stable: StableHow,
) -> Result<()> {
while !data.is_empty() {
let chunk_offset = offset;
let chunk_len = cmp::min(data.len(), self.write_size as usize);
let result = self
.nfs
.write(handle, chunk_offset, stable, &data[..chunk_len])?;
let written = result.count;
if written == 0 {
return Err(Error::Protocol("WRITE accepted zero bytes".to_owned()));
}
let written = written as usize;
if written > chunk_len {
return Err(Error::Protocol(format!(
"WRITE reported {written} bytes for a {chunk_len} byte request"
)));
}
if !result.committed.satisfies(stable) {
let commit = self.nfs.commit(handle, chunk_offset, result.count)?;
if commit.verifier != result.verifier {
return Err(Error::Protocol(
"COMMIT verifier changed after unstable WRITE".to_owned(),
));
}
}
advance_offset(&mut offset, written, "WRITE")?;
data = &data[written..];
}
Ok(())
}
fn write_handle_from_reader<R: Read + ?Sized>(
&mut self,
handle: &FileHandle,
reader: &mut R,
stable: StableHow,
) -> Result<u64> {
self.write_handle_from_reader_at(handle, 0, reader, stable)
}
fn write_handle_from_reader_at<R: Read + ?Sized>(
&mut self,
handle: &FileHandle,
mut offset: u64,
reader: &mut R,
stable: StableHow,
) -> Result<u64> {
let mut written = 0;
let mut buffer = vec![0; self.write_size as usize];
loop {
let read = reader.read(&mut buffer)?;
if read == 0 {
return Ok(written);
}
self.write_handle_all(handle, offset, &buffer[..read], stable)?;
advance_offset(&mut offset, read, "WRITE reader")?;
advance_offset(&mut written, read, "WRITE reader total")?;
}
}
fn copy_handles(
&mut self,
source: &FileHandle,
target: &FileHandle,
stable: StableHow,
) -> Result<u64> {
let mut offset = 0;
loop {
let chunk = self.nfs.read(source, offset, self.read_size)?;
if chunk.data.len() > self.read_size as usize {
return Err(Error::Protocol(format!(
"READ returned {} bytes for a {} byte request",
chunk.data.len(),
self.read_size
)));
}
if chunk.data.is_empty() {
if chunk.eof {
return Ok(offset);
}
return Err(Error::Protocol(
"READ returned no data before EOF".to_owned(),
));
}
self.write_handle_all(target, offset, &chunk.data, stable)?;
advance_offset(&mut offset, chunk.data.len(), "COPY")?;
if chunk.eof {
return Ok(offset);
}
}
}
}