1use crate::dir::Dir;
2use crate::error::{ClientResultExt, ZeroFsError};
3use crate::file::File;
4use crate::path::{components, display, display_path, split_parent};
5use crate::session::{FidGuard, Session};
6use crate::types::{
7 Capabilities, ConnectOptions, DirEntry, FileType, Metadata, NodeKind, OpenOptions, SetAttrs,
8 SetTime, StatFs,
9};
10use bytes::{Bytes, BytesMut};
11use ninep_client::{NOFID, NinePClient};
12use ninep_proto::Stat;
13use std::collections::VecDeque;
14use std::ffi::OsString;
15use std::os::unix::ffi::{OsStrExt, OsStringExt};
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18use std::sync::atomic::Ordering;
19use std::time::Duration;
20
21const DEFAULT_9P_PORT: u16 = 5564;
22
23const MAX_SYMLINK_HOPS: u32 = 40;
25
26pub struct Client {
35 session: Arc<Session>,
36}
37
38impl std::fmt::Debug for Client {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 f.debug_struct("Client")
41 .field("closed", &self.session.closed.load(Ordering::Relaxed))
42 .finish_non_exhaustive()
43 }
44}
45
46impl Client {
47 pub async fn connect(target: &str) -> Result<Arc<Client>, ZeroFsError> {
51 Self::connect_with(target, ConnectOptions::default()).await
52 }
53
54 pub async fn connect_with(
56 target: &str,
57 opts: ConnectOptions,
58 ) -> Result<Arc<Client>, ZeroFsError> {
59 let fut = Self::establish(target, &opts);
60 match opts.connect_timeout_ms {
61 Some(ms) => match tokio::time::timeout(Duration::from_millis(ms as u64), fut).await {
62 Ok(result) => result,
63 Err(_) => Err(ZeroFsError::ConnectFailed {
64 message: format!("connecting to {target}: timed out after {ms} ms"),
65 }),
66 },
67 None => fut.await,
68 }
69 }
70
71 async fn establish(target: &str, opts: &ConnectOptions) -> Result<Arc<Client>, ZeroFsError> {
72 let client = dial(target, opts.msize).await?;
73 let uid = opts.uid.unwrap_or_else(|| unsafe { libc::geteuid() });
74 let gid = opts.gid.unwrap_or_else(|| unsafe { libc::getegid() });
75 let uname = match &opts.uname {
76 Some(u) => u.clone(),
77 None => std::env::var("USER").unwrap_or_else(|_| uid.to_string()),
78 };
79 let root_fid = client.alloc_fid();
80 client
81 .attach(root_fid, NOFID, &uname, &opts.aname, uid)
82 .await
83 .map_err(|e| ZeroFsError::ConnectFailed {
84 message: format!("attach to {target} failed: {e}"),
85 })?;
86 let session = Session::new(client, root_fid, gid);
87 Ok(Arc::new(Client { session }))
88 }
89
90 pub fn capabilities(&self) -> Capabilities {
93 let c = &self.session.client;
94 Capabilities {
95 extensions_v1: c.extensions_enabled(),
96 extensions_v2: c.extensions_v2_enabled(),
97 msize: c.msize(),
98 max_read_chunk: c.max_io(),
99 max_write_chunk: c.max_write_payload(),
100 }
101 }
102
103 #[doc(hidden)]
107 pub fn outstanding_fids(&self) -> usize {
108 self.session.client.outstanding_fids()
109 }
110
111 pub async fn close(&self) {
117 if self.session.closed.swap(true, Ordering::AcqRel) {
118 return;
119 }
120 self.session.enqueue_clunk(self.session.root_fid);
121 }
122
123 pub async fn read(&self, path: impl AsRef<Path>) -> Result<Bytes, ZeroFsError> {
126 let path = path.as_ref();
127 let pd = display(path);
128 let (guard, stat) = self.open_read(path, &pd).await?;
129 let max = self.session.client.max_io().max(1);
130 let first = self
132 .session
133 .client
134 .read_bytes(guard.fid(), 0, max)
135 .await
136 .ctx(&pd)?;
137 if (first.len() as u32) < max {
138 return Ok(first);
139 }
140 let cap = stat
143 .as_ref()
144 .map_or(0, |s| s.size as usize)
145 .min((max as usize).saturating_mul(2));
146 let mut out = BytesMut::with_capacity(cap);
147 out.extend_from_slice(&first);
148 loop {
149 let data = self
150 .session
151 .client
152 .read_bytes(guard.fid(), out.len() as u64, max)
153 .await
154 .ctx(&pd)?;
155 let got = data.len();
156 out.extend_from_slice(&data);
157 if (got as u32) < max {
158 return Ok(out.freeze());
159 }
160 }
161 }
162
163 pub async fn read_range(
165 &self,
166 path: impl AsRef<Path>,
167 offset: u64,
168 len: u32,
169 ) -> Result<Bytes, ZeroFsError> {
170 let path = path.as_ref();
171 let pd = display(path);
172 let (guard, _) = self.open_read(path, &pd).await?;
173 self.session
174 .client
175 .read_bytes(guard.fid(), offset, len)
176 .await
177 .ctx(&pd)
178 }
179
180 pub async fn write(&self, path: impl AsRef<Path>, data: &[u8]) -> Result<(), ZeroFsError> {
183 let path = path.as_ref();
184 let pd = display(path);
185 let opts = OpenOptions::write_only().create(true).truncate(true);
186 let guard = self.open_relative_path(path, &pd, &opts).await?;
187 self.session.write_all(guard.fid(), 0, data, &pd).await
188 }
189
190 pub async fn append(&self, path: impl AsRef<Path>, data: &[u8]) -> Result<u64, ZeroFsError> {
194 let path = path.as_ref();
195 let pd = display(path);
196 let opts = OpenOptions::write_only().create(true);
197 let guard = self.open_relative_path(path, &pd, &opts).await?;
198 let stat = self.session.stat_fid(guard.fid(), &pd).await?;
199 self.session
200 .write_all(guard.fid(), stat.size, data, &pd)
201 .await?;
202 Ok(stat.size)
203 }
204
205 async fn parent_of<'a>(
208 &self,
209 path: &'a Path,
210 pd: &str,
211 ) -> Result<(FidGuard, &'a [u8]), ZeroFsError> {
212 self.session.check_open()?;
213 let names = components(path)?;
214 let (parents, name) = split_parent(&names, pd)?;
215 let (guard, _) = self.session.walk(parents, pd).await?;
216 Ok((guard, name))
217 }
218
219 async fn open_read(
222 &self,
223 path: &Path,
224 pd: &str,
225 ) -> Result<(FidGuard, Option<Stat>), ZeroFsError> {
226 self.session.check_open()?;
227 let names = components(path)?;
228 let (guard, stat) = self.session.walk(&names, pd).await?;
229 self.session
230 .lopen(guard.fid(), libc::O_RDONLY as u32, pd)
231 .await?;
232 Ok((guard, stat))
233 }
234
235 async fn open_relative_path(
237 &self,
238 path: &Path,
239 pd: &str,
240 opts: &OpenOptions,
241 ) -> Result<FidGuard, ZeroFsError> {
242 let (dir_guard, name) = self.parent_of(path, pd).await?;
243 self.session
244 .open_relative(dir_guard.fid(), name, opts, pd)
245 .await
246 }
247
248 pub async fn stat(&self, path: impl AsRef<Path>) -> Result<Metadata, ZeroFsError> {
253 let path = path.as_ref();
254 let pd = display(path);
255 self.session.check_open()?;
256 let names = components(path)?;
257 let (_guard, stat) = self
258 .session
259 .walk_stat_from(self.session.root_fid, &names, &pd)
260 .await?;
261 Ok(Metadata::from_stat(&stat))
262 }
263
264 pub async fn metadata(&self, path: impl AsRef<Path>) -> Result<Metadata, ZeroFsError> {
267 let path = path.as_ref();
268 let pd = display(path);
269 self.session.check_open()?;
270 let (_, stat) = self.resolve(path, &pd).await?;
271 Ok(Metadata::from_stat(&stat))
272 }
273
274 pub async fn canonicalize(&self, path: impl AsRef<Path>) -> Result<PathBuf, ZeroFsError> {
278 let path = path.as_ref();
279 let pd = display(path);
280 self.session.check_open()?;
281 let (stack, _) = self.resolve(path, &pd).await?;
282 let mut buf = Vec::new();
283 for comp in &stack {
284 buf.push(b'/');
285 buf.extend_from_slice(comp);
286 }
287 if buf.is_empty() {
288 buf.push(b'/');
289 }
290 Ok(PathBuf::from(OsString::from_vec(buf)))
291 }
292
293 pub async fn exists(&self, path: impl AsRef<Path>) -> Result<bool, ZeroFsError> {
296 match self.stat(path).await {
297 Ok(_) => Ok(true),
298 Err(ZeroFsError::NotFound { .. }) => Ok(false),
299 Err(e) => Err(e),
300 }
301 }
302
303 pub async fn set_attr(
305 &self,
306 path: impl AsRef<Path>,
307 attrs: SetAttrs,
308 ) -> Result<Metadata, ZeroFsError> {
309 let path = path.as_ref();
310 let pd = display(path);
311 self.session.check_open()?;
312 let names = components(path)?;
313 let (guard, _) = self.session.walk(&names, &pd).await?;
314 let stat = self.session.setattr_fid(guard.fid(), &attrs, &pd).await?;
315 Ok(Metadata::from_stat(&stat))
316 }
317
318 pub async fn chmod(&self, path: impl AsRef<Path>, mode: u32) -> Result<Metadata, ZeroFsError> {
320 self.set_attr(
321 path,
322 SetAttrs {
323 mode: Some(mode),
324 ..Default::default()
325 },
326 )
327 .await
328 }
329
330 pub async fn chown(
332 &self,
333 path: impl AsRef<Path>,
334 uid: Option<u32>,
335 gid: Option<u32>,
336 ) -> Result<Metadata, ZeroFsError> {
337 self.set_attr(
338 path,
339 SetAttrs {
340 uid,
341 gid,
342 ..Default::default()
343 },
344 )
345 .await
346 }
347
348 pub async fn truncate(
350 &self,
351 path: impl AsRef<Path>,
352 size: u64,
353 ) -> Result<Metadata, ZeroFsError> {
354 self.set_attr(
355 path,
356 SetAttrs {
357 size: Some(size),
358 ..Default::default()
359 },
360 )
361 .await
362 }
363
364 pub async fn set_times(
366 &self,
367 path: impl AsRef<Path>,
368 atime: Option<SetTime>,
369 mtime: Option<SetTime>,
370 ) -> Result<Metadata, ZeroFsError> {
371 self.set_attr(
372 path,
373 SetAttrs {
374 atime,
375 mtime,
376 ..Default::default()
377 },
378 )
379 .await
380 }
381
382 pub async fn statfs(&self) -> Result<StatFs, ZeroFsError> {
384 self.session.check_open()?;
385 let r = self
386 .session
387 .client
388 .statfs(self.session.root_fid)
389 .await
390 .ctx("/")?;
391 Ok(StatFs::from_wire(&r))
392 }
393
394 pub async fn sync(&self) -> Result<(), ZeroFsError> {
398 self.session.check_open()?;
399 self.session
400 .client
401 .fsync(self.session.root_fid, 0)
402 .await
403 .ctx("/")
404 }
405
406 pub async fn create_dir(
408 &self,
409 path: impl AsRef<Path>,
410 mode: u32,
411 ) -> Result<Metadata, ZeroFsError> {
412 let path = path.as_ref();
413 let pd = display(path);
414 let (dir_guard, name) = self.parent_of(path, &pd).await?;
415 self.session
416 .mkdir_at(dir_guard.fid(), name, mode, &pd)
417 .await
418 }
419
420 pub async fn create_dir_all(
422 &self,
423 path: impl AsRef<Path>,
424 mode: u32,
425 ) -> Result<(), ZeroFsError> {
426 let path = path.as_ref();
427 self.session.check_open()?;
428 let names = components(path)?;
429 for depth in 1..=names.len() {
430 let prefix = &names[..depth];
431 let pd = display_path(prefix);
432 let (parents, name) = split_parent(prefix, &pd)?;
433 let (dir_guard, _) = self.session.walk(parents, &pd).await?;
434 match self
435 .session
436 .mkdir_at(dir_guard.fid(), name, mode, &pd)
437 .await
438 {
439 Ok(_) | Err(ZeroFsError::AlreadyExists { .. }) => {}
440 Err(e) => return Err(e),
441 }
442 }
443 if !names.is_empty() {
447 let meta = self.metadata(path).await?;
448 if !meta.is_dir() {
449 return Err(ZeroFsError::NotADirectory {
450 path: display(path),
451 });
452 }
453 }
454 Ok(())
455 }
456
457 pub async fn remove_file(&self, path: impl AsRef<Path>) -> Result<(), ZeroFsError> {
459 let path = path.as_ref();
460 let pd = display(path);
461 let (dir_guard, name) = self.parent_of(path, &pd).await?;
462 self.session
463 .client
464 .unlinkat(dir_guard.fid(), name, 0)
465 .await
466 .ctx(&pd)
467 }
468
469 pub async fn remove_dir(&self, path: impl AsRef<Path>) -> Result<(), ZeroFsError> {
471 let path = path.as_ref();
472 let pd = display(path);
473 let (dir_guard, name) = self.parent_of(path, &pd).await?;
474 self.session
475 .client
476 .unlinkat(dir_guard.fid(), name, libc::AT_REMOVEDIR as u32)
477 .await
478 .ctx(&pd)
479 }
480
481 pub async fn remove_dir_all(&self, path: impl AsRef<Path>) -> Result<(), ZeroFsError> {
484 let path = path.as_ref();
485 self.session.check_open()?;
486 if components(path)?.is_empty() {
487 return Err(ZeroFsError::InvalidArgument {
488 message: "refusing to remove the attach root".to_string(),
489 });
490 }
491 let dir = self.open_dir(path).await?;
492 let result = remove_dir_contents(&dir).await;
493 dir.close().await;
494 result?;
495 self.remove_dir(path).await
496 }
497
498 pub async fn rename(
501 &self,
502 from: impl AsRef<Path>,
503 to: impl AsRef<Path>,
504 ) -> Result<(), ZeroFsError> {
505 let (from, to) = (from.as_ref(), to.as_ref());
506 let (fd, td) = (display(from), display(to));
507 let (from_guard, from_name) = self.parent_of(from, &fd).await?;
508 let (to_guard, to_name) = self.parent_of(to, &td).await?;
509 self.session
510 .client
511 .renameat(from_guard.fid(), from_name, to_guard.fid(), to_name)
512 .await
513 .ctx(&fd)
514 }
515
516 pub async fn hard_link(
518 &self,
519 original: impl AsRef<Path>,
520 link: impl AsRef<Path>,
521 ) -> Result<Metadata, ZeroFsError> {
522 let (original, link) = (original.as_ref(), link.as_ref());
523 let (od, ld) = (display(original), display(link));
524 let (dir_guard, link_name) = self.parent_of(link, &ld).await?;
525 let orig_names = components(original)?;
526 let (orig_guard, _) = self.session.walk(&orig_names, &od).await?;
527 self.session
528 .link_at(dir_guard.fid(), orig_guard.fid(), link_name, &ld)
529 .await
530 }
531
532 pub async fn symlink(
535 &self,
536 target: impl AsRef<Path>,
537 link_path: impl AsRef<Path>,
538 ) -> Result<Metadata, ZeroFsError> {
539 let link_path = link_path.as_ref();
540 let ld = display(link_path);
541 let (dir_guard, name) = self.parent_of(link_path, &ld).await?;
542 self.session
543 .symlink_at(
544 dir_guard.fid(),
545 name,
546 target.as_ref().as_os_str().as_bytes(),
547 &ld,
548 )
549 .await
550 }
551
552 pub async fn read_link(&self, path: impl AsRef<Path>) -> Result<PathBuf, ZeroFsError> {
554 let path = path.as_ref();
555 let pd = display(path);
556 self.session.check_open()?;
557 let names = components(path)?;
558 let (guard, _) = self.session.walk(&names, &pd).await?;
559 let target = self.session.client.readlink(guard.fid()).await.ctx(&pd)?;
560 Ok(PathBuf::from(OsString::from_vec(target)))
561 }
562
563 pub async fn mknod(
566 &self,
567 path: impl AsRef<Path>,
568 kind: NodeKind,
569 mode: u32,
570 ) -> Result<Metadata, ZeroFsError> {
571 let path = path.as_ref();
572 let pd = display(path);
573 let (dir_guard, name) = self.parent_of(path, &pd).await?;
574 self.session
575 .mknod_at(dir_guard.fid(), name, kind, mode, &pd)
576 .await
577 }
578
579 pub async fn read_dir(&self, path: impl AsRef<Path>) -> Result<Vec<DirEntry>, ZeroFsError> {
582 let dir = self.open_dir(path).await?;
583 let mut out = Vec::new();
584 let result = loop {
585 match dir.next_batch(None).await {
586 Ok(batch) if batch.is_empty() => break Ok(out),
587 Ok(batch) => out.extend(batch),
588 Err(e) => break Err(e),
589 }
590 };
591 dir.close().await;
592 result
593 }
594
595 pub async fn open_dir(&self, path: impl AsRef<Path>) -> Result<Arc<Dir>, ZeroFsError> {
598 let path = path.as_ref();
599 let pd = display(path);
600 self.session.check_open()?;
601 let names = components(path)?;
602 let (guard, stat) = self.session.walk(&names, &pd).await?;
603 if let Some(stat) = &stat
604 && FileType::from_mode(stat.mode) != FileType::Dir
605 {
606 return Err(ZeroFsError::NotADirectory { path: pd });
607 }
608 Ok(Dir::new(
609 Arc::clone(&self.session),
610 guard,
611 display_path(&names),
612 ))
613 }
614
615 pub async fn open(
617 &self,
618 path: impl AsRef<Path>,
619 opts: OpenOptions,
620 ) -> Result<Arc<File>, ZeroFsError> {
621 let path = path.as_ref();
622 let pd = display(path);
623 self.session.check_open()?;
624 let guard = self.open_relative_path(path, &pd, &opts).await?;
625 Ok(File::new(Arc::clone(&self.session), guard, pd))
626 }
627
628 pub async fn create(&self, path: impl AsRef<Path>) -> Result<Arc<File>, ZeroFsError> {
630 self.open(path, OpenOptions::read_write().create(true).truncate(true))
631 .await
632 }
633
634 async fn resolve(&self, path: &Path, pd: &str) -> Result<(Vec<Vec<u8>>, Stat), ZeroFsError> {
639 let session = &self.session;
640
641 let literal: Vec<&[u8]> = components(path)?;
645 if let Ok((_guard, stat)) = session.walk_stat_from(session.root_fid, &literal, pd).await
646 && FileType::from_mode(stat.mode) != FileType::Symlink
647 {
648 return Ok((literal.iter().map(|c| c.to_vec()).collect(), stat));
649 }
650
651 let mut todo: VecDeque<Vec<u8>> = literal.iter().map(|c| c.to_vec()).collect();
652 let mut stack: Vec<Vec<u8>> = Vec::new();
653 let mut hops = 0u32;
654 let (mut cur, _) = session.walk(&[], pd).await?;
657 let mut cur_stat = session.stat_fid(cur.fid(), pd).await?;
658
659 while let Some(name) = todo.pop_front() {
660 if name == b".." {
661 stack.pop();
664 let refs: Vec<&[u8]> = stack.iter().map(|c| c.as_slice()).collect();
665 let (guard, stat) = session.walk_stat_from(session.root_fid, &refs, pd).await?;
666 cur = guard;
667 cur_stat = stat;
668 continue;
669 }
670
671 let (guard, stat) = session
672 .walk_stat_from(cur.fid(), &[name.as_slice()], pd)
673 .await?;
674 if FileType::from_mode(stat.mode) == FileType::Symlink {
675 hops += 1;
676 if hops > MAX_SYMLINK_HOPS {
677 return Err(ZeroFsError::TooManySymlinks {
678 path: pd.to_string(),
679 });
680 }
681 let target = session.client.readlink(guard.fid()).await.ctx(pd)?;
682 if target.first() == Some(&b'/') {
683 stack.clear();
684 let (root_clone, _) = session.walk(&[], pd).await?;
685 cur = root_clone;
686 cur_stat = session.stat_fid(cur.fid(), pd).await?;
687 }
688 for comp in target
690 .split(|&b| b == b'/')
691 .filter(|c| !c.is_empty() && *c != b".")
692 .rev()
693 {
694 todo.push_front(comp.to_vec());
695 }
696 } else {
697 stack.push(name);
698 cur = guard;
699 cur_stat = stat;
700 }
701 }
702
703 Ok((stack, cur_stat))
704 }
705}
706
707fn remove_dir_contents<'a>(
710 dir: &'a Dir,
711) -> std::pin::Pin<Box<dyn Future<Output = Result<(), ZeroFsError>> + Send + 'a>> {
712 Box::pin(async move {
713 loop {
714 dir.rewind().await?;
715 let batch = dir.next_batch(None).await?;
716 if batch.is_empty() {
717 return Ok(());
718 }
719 for entry in batch {
720 if entry.file_type == FileType::Dir {
721 let child = dir.open_dir_at(&entry.name_bytes).await?;
722 let result = remove_dir_contents(&child).await;
723 child.close().await;
724 result?;
725 dir.remove_dir_at(&entry.name_bytes).await?;
726 } else {
727 dir.remove_file_at(&entry.name_bytes).await?;
728 }
729 }
730 }
731 })
732}
733
734async fn dial(target: &str, msize: u32) -> Result<Arc<NinePClient>, ZeroFsError> {
736 let connect_failed = |message: String| ZeroFsError::ConnectFailed { message };
737
738 if let Some(rest) = target.strip_prefix("unix:") {
739 let path = rest.strip_prefix("//").unwrap_or(rest);
740 return NinePClient::connect_unix(path, msize)
741 .await
742 .map_err(|e| connect_failed(format!("9P unix socket {path}: {e}")));
743 }
744
745 let hostport = target.strip_prefix("tcp://").unwrap_or(target);
746
747 if hostport.starts_with('/') || hostport.starts_with('.') {
749 return NinePClient::connect_unix(hostport, msize)
750 .await
751 .map_err(|e| connect_failed(format!("9P unix socket {hostport}: {e}")));
752 }
753
754 let addr = resolve_addr(hostport).await?;
755 NinePClient::connect_tcp(addr, msize)
756 .await
757 .map_err(|e| connect_failed(format!("9P server {addr}: {e}")))
758}
759
760async fn resolve_addr(s: &str) -> Result<std::net::SocketAddr, ZeroFsError> {
761 if let Ok(addr) = s.parse::<std::net::SocketAddr>() {
762 return Ok(addr);
763 }
764 let with_port = if s.contains(':') {
765 s.to_string()
766 } else {
767 format!("{s}:{DEFAULT_9P_PORT}")
768 };
769 tokio::net::lookup_host(&with_port)
770 .await
771 .map_err(|e| ZeroFsError::ConnectFailed {
772 message: format!("resolving {with_port}: {e}"),
773 })?
774 .next()
775 .ok_or_else(|| ZeroFsError::ConnectFailed {
776 message: format!("no addresses resolved for {with_port}"),
777 })
778}