use crate::dir::Dir;
use crate::error::{ClientResultExt, ZeroFsError};
use crate::file::File;
use crate::path::{components, display, display_path, split_parent};
use crate::session::{FidGuard, Session};
use crate::types::{
Capabilities, ConnectOptions, DirEntry, FileType, Metadata, NodeKind, OpenOptions, SetAttrs,
SetTime, StatFs,
};
use bytes::{Bytes, BytesMut};
use ninep_client::{NOFID, NinePClient};
use ninep_proto::Stat;
use std::collections::VecDeque;
use std::ffi::OsString;
use std::os::unix::ffi::{OsStrExt, OsStringExt};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::time::Duration;
const DEFAULT_9P_PORT: u16 = 5564;
const MAX_SYMLINK_HOPS: u32 = 40;
pub struct Client {
session: Arc<Session>,
}
impl std::fmt::Debug for Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Client")
.field("closed", &self.session.closed.load(Ordering::Relaxed))
.finish_non_exhaustive()
}
}
impl Client {
pub async fn connect(target: &str) -> Result<Arc<Client>, ZeroFsError> {
Self::connect_with(target, ConnectOptions::default()).await
}
pub async fn connect_with(
target: &str,
opts: ConnectOptions,
) -> Result<Arc<Client>, ZeroFsError> {
let fut = Self::establish(target, &opts);
match opts.connect_timeout_ms {
Some(ms) => match tokio::time::timeout(Duration::from_millis(ms as u64), fut).await {
Ok(result) => result,
Err(_) => Err(ZeroFsError::ConnectFailed {
message: format!("connecting to {target}: timed out after {ms} ms"),
}),
},
None => fut.await,
}
}
async fn establish(target: &str, opts: &ConnectOptions) -> Result<Arc<Client>, ZeroFsError> {
let client = dial(target, opts.msize).await?;
let uid = opts.uid.unwrap_or_else(|| unsafe { libc::geteuid() });
let gid = opts.gid.unwrap_or_else(|| unsafe { libc::getegid() });
let uname = match &opts.uname {
Some(u) => u.clone(),
None => std::env::var("USER").unwrap_or_else(|_| uid.to_string()),
};
let root_fid = client.alloc_fid();
client
.attach(root_fid, NOFID, &uname, &opts.aname, uid)
.await
.map_err(|e| ZeroFsError::ConnectFailed {
message: format!("attach to {target} failed: {e}"),
})?;
let session = Session::new(client, root_fid, gid);
Ok(Arc::new(Client { session }))
}
pub fn capabilities(&self) -> Capabilities {
let c = &self.session.client;
Capabilities {
extensions_v1: c.extensions_enabled(),
extensions_v2: c.extensions_v2_enabled(),
msize: c.msize(),
max_read_chunk: c.max_io(),
max_write_chunk: c.max_write_payload(),
}
}
#[doc(hidden)]
pub fn outstanding_fids(&self) -> usize {
self.session.client.outstanding_fids()
}
pub async fn close(&self) {
if self.session.closed.swap(true, Ordering::AcqRel) {
return;
}
self.session.enqueue_clunk(self.session.root_fid);
}
pub async fn read(&self, path: impl AsRef<Path>) -> Result<Bytes, ZeroFsError> {
let path = path.as_ref();
let pd = display(path);
let (guard, stat) = self.open_read(path, &pd).await?;
let max = self.session.client.max_io().max(1);
let first = self
.session
.client
.read_bytes(guard.fid(), 0, max)
.await
.ctx(&pd)?;
if (first.len() as u32) < max {
return Ok(first);
}
let cap = stat
.as_ref()
.map_or(0, |s| s.size as usize)
.min((max as usize).saturating_mul(2));
let mut out = BytesMut::with_capacity(cap);
out.extend_from_slice(&first);
loop {
let data = self
.session
.client
.read_bytes(guard.fid(), out.len() as u64, max)
.await
.ctx(&pd)?;
let got = data.len();
out.extend_from_slice(&data);
if (got as u32) < max {
return Ok(out.freeze());
}
}
}
pub async fn read_range(
&self,
path: impl AsRef<Path>,
offset: u64,
len: u32,
) -> Result<Bytes, ZeroFsError> {
let path = path.as_ref();
let pd = display(path);
let (guard, _) = self.open_read(path, &pd).await?;
self.session
.client
.read_bytes(guard.fid(), offset, len)
.await
.ctx(&pd)
}
pub async fn write(&self, path: impl AsRef<Path>, data: &[u8]) -> Result<(), ZeroFsError> {
let path = path.as_ref();
let pd = display(path);
let opts = OpenOptions::write_only().create(true).truncate(true);
let guard = self.open_relative_path(path, &pd, &opts).await?;
self.session.write_all(guard.fid(), 0, data, &pd).await
}
pub async fn append(&self, path: impl AsRef<Path>, data: &[u8]) -> Result<u64, ZeroFsError> {
let path = path.as_ref();
let pd = display(path);
let opts = OpenOptions::write_only().create(true);
let guard = self.open_relative_path(path, &pd, &opts).await?;
let stat = self.session.stat_fid(guard.fid(), &pd).await?;
self.session
.write_all(guard.fid(), stat.size, data, &pd)
.await?;
Ok(stat.size)
}
async fn parent_of<'a>(
&self,
path: &'a Path,
pd: &str,
) -> Result<(FidGuard, &'a [u8]), ZeroFsError> {
self.session.check_open()?;
let names = components(path)?;
let (parents, name) = split_parent(&names, pd)?;
let (guard, _) = self.session.walk(parents, pd).await?;
Ok((guard, name))
}
async fn open_read(
&self,
path: &Path,
pd: &str,
) -> Result<(FidGuard, Option<Stat>), ZeroFsError> {
self.session.check_open()?;
let names = components(path)?;
let (guard, stat) = self.session.walk(&names, pd).await?;
self.session
.lopen(guard.fid(), libc::O_RDONLY as u32, pd)
.await?;
Ok((guard, stat))
}
async fn open_relative_path(
&self,
path: &Path,
pd: &str,
opts: &OpenOptions,
) -> Result<FidGuard, ZeroFsError> {
let (dir_guard, name) = self.parent_of(path, pd).await?;
self.session
.open_relative(dir_guard.fid(), name, opts, pd)
.await
}
pub async fn stat(&self, path: impl AsRef<Path>) -> Result<Metadata, ZeroFsError> {
let path = path.as_ref();
let pd = display(path);
self.session.check_open()?;
let names = components(path)?;
let (_guard, stat) = self
.session
.walk_stat_from(self.session.root_fid, &names, &pd)
.await?;
Ok(Metadata::from_stat(&stat))
}
pub async fn metadata(&self, path: impl AsRef<Path>) -> Result<Metadata, ZeroFsError> {
let path = path.as_ref();
let pd = display(path);
self.session.check_open()?;
let (_, stat) = self.resolve(path, &pd).await?;
Ok(Metadata::from_stat(&stat))
}
pub async fn canonicalize(&self, path: impl AsRef<Path>) -> Result<PathBuf, ZeroFsError> {
let path = path.as_ref();
let pd = display(path);
self.session.check_open()?;
let (stack, _) = self.resolve(path, &pd).await?;
let mut buf = Vec::new();
for comp in &stack {
buf.push(b'/');
buf.extend_from_slice(comp);
}
if buf.is_empty() {
buf.push(b'/');
}
Ok(PathBuf::from(OsString::from_vec(buf)))
}
pub async fn exists(&self, path: impl AsRef<Path>) -> Result<bool, ZeroFsError> {
match self.stat(path).await {
Ok(_) => Ok(true),
Err(ZeroFsError::NotFound { .. }) => Ok(false),
Err(e) => Err(e),
}
}
pub async fn set_attr(
&self,
path: impl AsRef<Path>,
attrs: SetAttrs,
) -> Result<Metadata, ZeroFsError> {
let path = path.as_ref();
let pd = display(path);
self.session.check_open()?;
let names = components(path)?;
let (guard, _) = self.session.walk(&names, &pd).await?;
let stat = self.session.setattr_fid(guard.fid(), &attrs, &pd).await?;
Ok(Metadata::from_stat(&stat))
}
pub async fn chmod(&self, path: impl AsRef<Path>, mode: u32) -> Result<Metadata, ZeroFsError> {
self.set_attr(
path,
SetAttrs {
mode: Some(mode),
..Default::default()
},
)
.await
}
pub async fn chown(
&self,
path: impl AsRef<Path>,
uid: Option<u32>,
gid: Option<u32>,
) -> Result<Metadata, ZeroFsError> {
self.set_attr(
path,
SetAttrs {
uid,
gid,
..Default::default()
},
)
.await
}
pub async fn truncate(
&self,
path: impl AsRef<Path>,
size: u64,
) -> Result<Metadata, ZeroFsError> {
self.set_attr(
path,
SetAttrs {
size: Some(size),
..Default::default()
},
)
.await
}
pub async fn set_times(
&self,
path: impl AsRef<Path>,
atime: Option<SetTime>,
mtime: Option<SetTime>,
) -> Result<Metadata, ZeroFsError> {
self.set_attr(
path,
SetAttrs {
atime,
mtime,
..Default::default()
},
)
.await
}
pub async fn statfs(&self) -> Result<StatFs, ZeroFsError> {
self.session.check_open()?;
let r = self
.session
.client
.statfs(self.session.root_fid)
.await
.ctx("/")?;
Ok(StatFs::from_wire(&r))
}
pub async fn sync(&self) -> Result<(), ZeroFsError> {
self.session.check_open()?;
self.session
.client
.fsync(self.session.root_fid, 0)
.await
.ctx("/")
}
pub async fn create_dir(
&self,
path: impl AsRef<Path>,
mode: u32,
) -> Result<Metadata, ZeroFsError> {
let path = path.as_ref();
let pd = display(path);
let (dir_guard, name) = self.parent_of(path, &pd).await?;
self.session
.mkdir_at(dir_guard.fid(), name, mode, &pd)
.await
}
pub async fn create_dir_all(
&self,
path: impl AsRef<Path>,
mode: u32,
) -> Result<(), ZeroFsError> {
let path = path.as_ref();
self.session.check_open()?;
let names = components(path)?;
for depth in 1..=names.len() {
let prefix = &names[..depth];
let pd = display_path(prefix);
let (parents, name) = split_parent(prefix, &pd)?;
let (dir_guard, _) = self.session.walk(parents, &pd).await?;
match self
.session
.mkdir_at(dir_guard.fid(), name, mode, &pd)
.await
{
Ok(_) | Err(ZeroFsError::AlreadyExists { .. }) => {}
Err(e) => return Err(e),
}
}
if !names.is_empty() {
let meta = self.metadata(path).await?;
if !meta.is_dir() {
return Err(ZeroFsError::NotADirectory {
path: display(path),
});
}
}
Ok(())
}
pub async fn remove_file(&self, path: impl AsRef<Path>) -> Result<(), ZeroFsError> {
let path = path.as_ref();
let pd = display(path);
let (dir_guard, name) = self.parent_of(path, &pd).await?;
self.session
.client
.unlinkat(dir_guard.fid(), name, 0)
.await
.ctx(&pd)
}
pub async fn remove_dir(&self, path: impl AsRef<Path>) -> Result<(), ZeroFsError> {
let path = path.as_ref();
let pd = display(path);
let (dir_guard, name) = self.parent_of(path, &pd).await?;
self.session
.client
.unlinkat(dir_guard.fid(), name, libc::AT_REMOVEDIR as u32)
.await
.ctx(&pd)
}
pub async fn remove_dir_all(&self, path: impl AsRef<Path>) -> Result<(), ZeroFsError> {
let path = path.as_ref();
self.session.check_open()?;
if components(path)?.is_empty() {
return Err(ZeroFsError::InvalidArgument {
message: "refusing to remove the attach root".to_string(),
});
}
let dir = self.open_dir(path).await?;
let result = remove_dir_contents(&dir).await;
dir.close().await;
result?;
self.remove_dir(path).await
}
pub async fn rename(
&self,
from: impl AsRef<Path>,
to: impl AsRef<Path>,
) -> Result<(), ZeroFsError> {
let (from, to) = (from.as_ref(), to.as_ref());
let (fd, td) = (display(from), display(to));
let (from_guard, from_name) = self.parent_of(from, &fd).await?;
let (to_guard, to_name) = self.parent_of(to, &td).await?;
self.session
.client
.renameat(from_guard.fid(), from_name, to_guard.fid(), to_name)
.await
.ctx(&fd)
}
pub async fn hard_link(
&self,
original: impl AsRef<Path>,
link: impl AsRef<Path>,
) -> Result<Metadata, ZeroFsError> {
let (original, link) = (original.as_ref(), link.as_ref());
let (od, ld) = (display(original), display(link));
let (dir_guard, link_name) = self.parent_of(link, &ld).await?;
let orig_names = components(original)?;
let (orig_guard, _) = self.session.walk(&orig_names, &od).await?;
self.session
.link_at(dir_guard.fid(), orig_guard.fid(), link_name, &ld)
.await
}
pub async fn symlink(
&self,
target: impl AsRef<Path>,
link_path: impl AsRef<Path>,
) -> Result<Metadata, ZeroFsError> {
let link_path = link_path.as_ref();
let ld = display(link_path);
let (dir_guard, name) = self.parent_of(link_path, &ld).await?;
self.session
.symlink_at(
dir_guard.fid(),
name,
target.as_ref().as_os_str().as_bytes(),
&ld,
)
.await
}
pub async fn read_link(&self, path: impl AsRef<Path>) -> Result<PathBuf, ZeroFsError> {
let path = path.as_ref();
let pd = display(path);
self.session.check_open()?;
let names = components(path)?;
let (guard, _) = self.session.walk(&names, &pd).await?;
let target = self.session.client.readlink(guard.fid()).await.ctx(&pd)?;
Ok(PathBuf::from(OsString::from_vec(target)))
}
pub async fn mknod(
&self,
path: impl AsRef<Path>,
kind: NodeKind,
mode: u32,
) -> Result<Metadata, ZeroFsError> {
let path = path.as_ref();
let pd = display(path);
let (dir_guard, name) = self.parent_of(path, &pd).await?;
self.session
.mknod_at(dir_guard.fid(), name, kind, mode, &pd)
.await
}
pub async fn read_dir(&self, path: impl AsRef<Path>) -> Result<Vec<DirEntry>, ZeroFsError> {
let dir = self.open_dir(path).await?;
let mut out = Vec::new();
let result = loop {
match dir.next_batch(None).await {
Ok(batch) if batch.is_empty() => break Ok(out),
Ok(batch) => out.extend(batch),
Err(e) => break Err(e),
}
};
dir.close().await;
result
}
pub async fn open_dir(&self, path: impl AsRef<Path>) -> Result<Arc<Dir>, ZeroFsError> {
let path = path.as_ref();
let pd = display(path);
self.session.check_open()?;
let names = components(path)?;
let (guard, stat) = self.session.walk(&names, &pd).await?;
if let Some(stat) = &stat
&& FileType::from_mode(stat.mode) != FileType::Dir
{
return Err(ZeroFsError::NotADirectory { path: pd });
}
Ok(Dir::new(
Arc::clone(&self.session),
guard,
display_path(&names),
))
}
pub async fn open(
&self,
path: impl AsRef<Path>,
opts: OpenOptions,
) -> Result<Arc<File>, ZeroFsError> {
let path = path.as_ref();
let pd = display(path);
self.session.check_open()?;
let guard = self.open_relative_path(path, &pd, &opts).await?;
Ok(File::new(Arc::clone(&self.session), guard, pd))
}
pub async fn create(&self, path: impl AsRef<Path>) -> Result<Arc<File>, ZeroFsError> {
self.open(path, OpenOptions::read_write().create(true).truncate(true))
.await
}
async fn resolve(&self, path: &Path, pd: &str) -> Result<(Vec<Vec<u8>>, Stat), ZeroFsError> {
let session = &self.session;
let literal: Vec<&[u8]> = components(path)?;
if let Ok((_guard, stat)) = session.walk_stat_from(session.root_fid, &literal, pd).await
&& FileType::from_mode(stat.mode) != FileType::Symlink
{
return Ok((literal.iter().map(|c| c.to_vec()).collect(), stat));
}
let mut todo: VecDeque<Vec<u8>> = literal.iter().map(|c| c.to_vec()).collect();
let mut stack: Vec<Vec<u8>> = Vec::new();
let mut hops = 0u32;
let (mut cur, _) = session.walk(&[], pd).await?;
let mut cur_stat = session.stat_fid(cur.fid(), pd).await?;
while let Some(name) = todo.pop_front() {
if name == b".." {
stack.pop();
let refs: Vec<&[u8]> = stack.iter().map(|c| c.as_slice()).collect();
let (guard, stat) = session.walk_stat_from(session.root_fid, &refs, pd).await?;
cur = guard;
cur_stat = stat;
continue;
}
let (guard, stat) = session
.walk_stat_from(cur.fid(), &[name.as_slice()], pd)
.await?;
if FileType::from_mode(stat.mode) == FileType::Symlink {
hops += 1;
if hops > MAX_SYMLINK_HOPS {
return Err(ZeroFsError::TooManySymlinks {
path: pd.to_string(),
});
}
let target = session.client.readlink(guard.fid()).await.ctx(pd)?;
if target.first() == Some(&b'/') {
stack.clear();
let (root_clone, _) = session.walk(&[], pd).await?;
cur = root_clone;
cur_stat = session.stat_fid(cur.fid(), pd).await?;
}
for comp in target
.split(|&b| b == b'/')
.filter(|c| !c.is_empty() && *c != b".")
.rev()
{
todo.push_front(comp.to_vec());
}
} else {
stack.push(name);
cur = guard;
cur_stat = stat;
}
}
Ok((stack, cur_stat))
}
}
fn remove_dir_contents<'a>(
dir: &'a Dir,
) -> std::pin::Pin<Box<dyn Future<Output = Result<(), ZeroFsError>> + Send + 'a>> {
Box::pin(async move {
loop {
dir.rewind().await?;
let batch = dir.next_batch(None).await?;
if batch.is_empty() {
return Ok(());
}
for entry in batch {
if entry.file_type == FileType::Dir {
let child = dir.open_dir_at(&entry.name_bytes).await?;
let result = remove_dir_contents(&child).await;
child.close().await;
result?;
dir.remove_dir_at(&entry.name_bytes).await?;
} else {
dir.remove_file_at(&entry.name_bytes).await?;
}
}
}
})
}
async fn dial(target: &str, msize: u32) -> Result<Arc<NinePClient>, ZeroFsError> {
let connect_failed = |message: String| ZeroFsError::ConnectFailed { message };
if let Some(rest) = target.strip_prefix("unix:") {
let path = rest.strip_prefix("//").unwrap_or(rest);
return NinePClient::connect_unix(path, msize)
.await
.map_err(|e| connect_failed(format!("9P unix socket {path}: {e}")));
}
let hostport = target.strip_prefix("tcp://").unwrap_or(target);
if hostport.starts_with('/') || hostport.starts_with('.') {
return NinePClient::connect_unix(hostport, msize)
.await
.map_err(|e| connect_failed(format!("9P unix socket {hostport}: {e}")));
}
let addr = resolve_addr(hostport).await?;
NinePClient::connect_tcp(addr, msize)
.await
.map_err(|e| connect_failed(format!("9P server {addr}: {e}")))
}
async fn resolve_addr(s: &str) -> Result<std::net::SocketAddr, ZeroFsError> {
if let Ok(addr) = s.parse::<std::net::SocketAddr>() {
return Ok(addr);
}
let with_port = if s.contains(':') {
s.to_string()
} else {
format!("{s}:{DEFAULT_9P_PORT}")
};
tokio::net::lookup_host(&with_port)
.await
.map_err(|e| ZeroFsError::ConnectFailed {
message: format!("resolving {with_port}: {e}"),
})?
.next()
.ok_or_else(|| ZeroFsError::ConnectFailed {
message: format!("no addresses resolved for {with_port}"),
})
}