Skip to main content

zerofs_client/
client.rs

1use crate::dir::Dir;
2use crate::error::{ClientResultExt, ZeroFsError};
3use crate::file::File;
4use crate::path::{components, display, display_path, split_parent};
5use crate::session::{FidGuard, Session};
6use crate::types::{
7    Capabilities, ConnectOptions, DirEntry, FileType, Metadata, NodeKind, OpenOptions, SetAttrs,
8    SetTime, StatFs,
9};
10use bytes::{Bytes, BytesMut};
11use ninep_client::{NOFID, NinePClient};
12use ninep_proto::Stat;
13use std::collections::VecDeque;
14use std::ffi::OsString;
15use std::os::unix::ffi::{OsStrExt, OsStringExt};
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18use std::sync::atomic::Ordering;
19use std::time::Duration;
20
21const DEFAULT_9P_PORT: u16 = 5564;
22
23/// Symlink resolution cap, mirroring Linux's SYMLOOP_MAX headroom.
24const MAX_SYMLINK_HOPS: u32 = 40;
25
26/// One ZeroFS session, one identity. Share via `Arc`; every method takes
27/// `&self` and is safe to call concurrently. The underlying connection
28/// reconnects transparently, blocking calls through outages; bound waits with
29/// your async runtime's timeout facilities (every future is cancel-safe).
30///
31/// Paths are bytes, as on POSIX and the 9P wire: every path parameter is
32/// `impl AsRef<Path>`, so `&str`, `PathBuf`, and `OsStr::from_bytes(..)` for
33/// non-UTF-8 names all work.
34pub struct Client {
35    session: Arc<Session>,
36}
37
38impl std::fmt::Debug for Client {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        f.debug_struct("Client")
41            .field("closed", &self.session.closed.load(Ordering::Relaxed))
42            .finish_non_exhaustive()
43    }
44}
45
46impl Client {
47    /// Connect with defaults. Targets: `"unix:/sock"`, `"tcp://host:port"`,
48    /// `"host:port"`, `"host"` (port 5564), or a bare filesystem path (unix
49    /// socket).
50    pub async fn connect(target: &str) -> Result<Arc<Client>, ZeroFsError> {
51        Self::connect_with(target, ConnectOptions::default()).await
52    }
53
54    /// Connect with explicit identity, timeout, and tuning.
55    pub async fn connect_with(
56        target: &str,
57        opts: ConnectOptions,
58    ) -> Result<Arc<Client>, ZeroFsError> {
59        let fut = Self::establish(target, &opts);
60        match opts.connect_timeout_ms {
61            Some(ms) => match tokio::time::timeout(Duration::from_millis(ms as u64), fut).await {
62                Ok(result) => result,
63                Err(_) => Err(ZeroFsError::ConnectFailed {
64                    message: format!("connecting to {target}: timed out after {ms} ms"),
65                }),
66            },
67            None => fut.await,
68        }
69    }
70
71    async fn establish(target: &str, opts: &ConnectOptions) -> Result<Arc<Client>, ZeroFsError> {
72        let client = dial(target, opts.msize).await?;
73        let uid = opts.uid.unwrap_or_else(|| unsafe { libc::geteuid() });
74        let gid = opts.gid.unwrap_or_else(|| unsafe { libc::getegid() });
75        let uname = match &opts.uname {
76            Some(u) => u.clone(),
77            None => std::env::var("USER").unwrap_or_else(|_| uid.to_string()),
78        };
79        let root_fid = client.alloc_fid();
80        client
81            .attach(root_fid, NOFID, &uname, &opts.aname, uid)
82            .await
83            .map_err(|e| ZeroFsError::ConnectFailed {
84                message: format!("attach to {target} failed: {e}"),
85            })?;
86        let session = Session::new(client, root_fid, gid);
87        Ok(Arc::new(Client { session }))
88    }
89
90    /// Snapshot of currently negotiated session properties (may change across
91    /// transparent reconnects).
92    pub fn capabilities(&self) -> Capabilities {
93        let c = &self.session.client;
94        Capabilities {
95            extensions_v1: c.extensions_enabled(),
96            extensions_v2: c.extensions_v2_enabled(),
97            msize: c.msize(),
98            max_read_chunk: c.max_io(),
99            max_write_chunk: c.max_write_payload(),
100        }
101    }
102
103    /// Number of server fids this client currently holds: the root, open
104    /// `File`/`Dir` handles, and any in-flight operations. A diagnostic hook for
105    /// leak tests, not part of the stable surface.
106    #[doc(hidden)]
107    pub fn outstanding_fids(&self) -> usize {
108        self.session.client.outstanding_fids()
109    }
110
111    /// Mark the client closed (later calls return `Closed`), then hand the root
112    /// fid to the janitor for a background clunk + recycle. Always succeeds,
113    /// idempotent, and never blocks (no await), so a cancelled close still
114    /// reclaims the root fid. Outstanding `File`/`Dir` handles keep working until
115    /// individually closed.
116    pub async fn close(&self) {
117        if self.session.closed.swap(true, Ordering::AcqRel) {
118            return;
119        }
120        self.session.enqueue_clunk(self.session.root_fid);
121    }
122
123    /// Read the entire file into memory. Returns [`Bytes`]: a whole file that
124    /// fits in one round trip comes back with no copy.
125    pub async fn read(&self, path: impl AsRef<Path>) -> Result<Bytes, ZeroFsError> {
126        let path = path.as_ref();
127        let pd = display(path);
128        let (guard, stat) = self.open_read(path, &pd).await?;
129        let max = self.session.client.max_io().max(1);
130        // A short first chunk means the whole file fit; hand it back uncopied.
131        let first = self
132            .session
133            .client
134            .read_bytes(guard.fid(), 0, max)
135            .await
136            .ctx(&pd)?;
137        if (first.len() as u32) < max {
138            return Ok(first);
139        }
140        // Cap the up-front reservation: `stat.size` is server-reported and may
141        // be wildly large (or hostile); the loop grows `out` as it fills.
142        let cap = stat
143            .as_ref()
144            .map_or(0, |s| s.size as usize)
145            .min((max as usize).saturating_mul(2));
146        let mut out = BytesMut::with_capacity(cap);
147        out.extend_from_slice(&first);
148        loop {
149            let data = self
150                .session
151                .client
152                .read_bytes(guard.fid(), out.len() as u64, max)
153                .await
154                .ctx(&pd)?;
155            let got = data.len();
156            out.extend_from_slice(&data);
157            if (got as u32) < max {
158                return Ok(out.freeze());
159            }
160        }
161    }
162
163    /// Read up to `len` bytes at `offset`; a shorter result means EOF.
164    pub async fn read_range(
165        &self,
166        path: impl AsRef<Path>,
167        offset: u64,
168        len: u32,
169    ) -> Result<Bytes, ZeroFsError> {
170        let path = path.as_ref();
171        let pd = display(path);
172        let (guard, _) = self.open_read(path, &pd).await?;
173        self.session
174            .client
175            .read_bytes(guard.fid(), offset, len)
176            .await
177            .ctx(&pd)
178    }
179
180    /// Create-or-truncate `path` (mode 0o644) and write all of `data`
181    /// (composed client-side: open/create + truncate + write).
182    pub async fn write(&self, path: impl AsRef<Path>, data: &[u8]) -> Result<(), ZeroFsError> {
183        let path = path.as_ref();
184        let pd = display(path);
185        let opts = OpenOptions::write_only().create(true).truncate(true);
186        let guard = self.open_relative_path(path, &pd, &opts).await?;
187        self.session.write_all(guard.fid(), 0, data, &pd).await
188    }
189
190    /// Append `data` at end-of-file (open-or-create + fstat + positioned
191    /// write, composed client-side); returns the offset where the data landed.
192    /// Last-writer-wins under concurrent appenders.
193    pub async fn append(&self, path: impl AsRef<Path>, data: &[u8]) -> Result<u64, ZeroFsError> {
194        let path = path.as_ref();
195        let pd = display(path);
196        let opts = OpenOptions::write_only().create(true);
197        let guard = self.open_relative_path(path, &pd, &opts).await?;
198        let stat = self.session.stat_fid(guard.fid(), &pd).await?;
199        self.session
200            .write_all(guard.fid(), stat.size, data, &pd)
201            .await?;
202        Ok(stat.size)
203    }
204
205    /// Shared namespace-op preamble: closed check, then walk to the parent of
206    /// `path`, returning the parent's guard and the final name component.
207    async fn parent_of<'a>(
208        &self,
209        path: &'a Path,
210        pd: &str,
211    ) -> Result<(FidGuard, &'a [u8]), ZeroFsError> {
212        self.session.check_open()?;
213        let names = components(path)?;
214        let (parents, name) = split_parent(&names, pd)?;
215        let (guard, _) = self.session.walk(parents, pd).await?;
216        Ok((guard, name))
217    }
218
219    /// Walk to `path` and open it read-only, returning the opened guard plus
220    /// the stat when the walk made it free.
221    async fn open_read(
222        &self,
223        path: &Path,
224        pd: &str,
225    ) -> Result<(FidGuard, Option<Stat>), ZeroFsError> {
226        self.session.check_open()?;
227        let names = components(path)?;
228        let (guard, stat) = self.session.walk(&names, pd).await?;
229        self.session
230            .lopen(guard.fid(), libc::O_RDONLY as u32, pd)
231            .await?;
232        Ok((guard, stat))
233    }
234
235    /// Walk to the parent and open/create the final component with `opts`.
236    async fn open_relative_path(
237        &self,
238        path: &Path,
239        pd: &str,
240        opts: &OpenOptions,
241    ) -> Result<FidGuard, ZeroFsError> {
242        let (dir_guard, name) = self.parent_of(path, pd).await?;
243        self.session
244            .open_relative(dir_guard.fid(), name, opts, pd)
245            .await
246    }
247
248    /// Report the entry at `path` without following symlinks; anywhere: a
249    /// path THROUGH a symlinked directory fails `NotADirectory` (9P walks are
250    /// literal; this applies to every path-taking method). [`Self::metadata`]
251    /// and [`Self::canonicalize`] are the only resolvers.
252    pub async fn stat(&self, path: impl AsRef<Path>) -> Result<Metadata, ZeroFsError> {
253        let path = path.as_ref();
254        let pd = display(path);
255        self.session.check_open()?;
256        let names = components(path)?;
257        let (_guard, stat) = self
258            .session
259            .walk_stat_from(self.session.root_fid, &names, &pd)
260            .await?;
261        Ok(Metadata::from_stat(&stat))
262    }
263
264    /// Like [`Self::stat`] but resolves symlinks (final AND intermediate
265    /// components) client-side (readlink + re-walk), capped at 40 hops.
266    pub async fn metadata(&self, path: impl AsRef<Path>) -> Result<Metadata, ZeroFsError> {
267        let path = path.as_ref();
268        let pd = display(path);
269        self.session.check_open()?;
270        let (_, stat) = self.resolve(path, &pd).await?;
271        Ok(Metadata::from_stat(&stat))
272    }
273
274    /// Resolve every symlink in `path` (40-hop cap) and return the canonical
275    /// path, for use with any other method. Lossless: paths are bytes, so a
276    /// non-UTF-8 component survives the round trip.
277    pub async fn canonicalize(&self, path: impl AsRef<Path>) -> Result<PathBuf, ZeroFsError> {
278        let path = path.as_ref();
279        let pd = display(path);
280        self.session.check_open()?;
281        let (stack, _) = self.resolve(path, &pd).await?;
282        let mut buf = Vec::new();
283        for comp in &stack {
284            buf.push(b'/');
285            buf.extend_from_slice(comp);
286        }
287        if buf.is_empty() {
288            buf.push(b'/');
289        }
290        Ok(PathBuf::from(OsString::from_vec(buf)))
291    }
292
293    /// True if the path exists (any file type, no symlink following);
294    /// `NotFound` becomes `false`.
295    pub async fn exists(&self, path: impl AsRef<Path>) -> Result<bool, ZeroFsError> {
296        match self.stat(path).await {
297            Ok(_) => Ok(true),
298            Err(ZeroFsError::NotFound { .. }) => Ok(false),
299            Err(e) => Err(e),
300        }
301    }
302
303    /// Apply any combination of metadata changes; returns post-change metadata.
304    pub async fn set_attr(
305        &self,
306        path: impl AsRef<Path>,
307        attrs: SetAttrs,
308    ) -> Result<Metadata, ZeroFsError> {
309        let path = path.as_ref();
310        let pd = display(path);
311        self.session.check_open()?;
312        let names = components(path)?;
313        let (guard, _) = self.session.walk(&names, &pd).await?;
314        let stat = self.session.setattr_fid(guard.fid(), &attrs, &pd).await?;
315        Ok(Metadata::from_stat(&stat))
316    }
317
318    /// Change permission bits.
319    pub async fn chmod(&self, path: impl AsRef<Path>, mode: u32) -> Result<Metadata, ZeroFsError> {
320        self.set_attr(
321            path,
322            SetAttrs {
323                mode: Some(mode),
324                ..Default::default()
325            },
326        )
327        .await
328    }
329
330    /// Change owner and/or group (`None` leaves a field untouched).
331    pub async fn chown(
332        &self,
333        path: impl AsRef<Path>,
334        uid: Option<u32>,
335        gid: Option<u32>,
336    ) -> Result<Metadata, ZeroFsError> {
337        self.set_attr(
338            path,
339            SetAttrs {
340                uid,
341                gid,
342                ..Default::default()
343            },
344        )
345        .await
346    }
347
348    /// Truncate or extend a file to `size` bytes.
349    pub async fn truncate(
350        &self,
351        path: impl AsRef<Path>,
352        size: u64,
353    ) -> Result<Metadata, ZeroFsError> {
354        self.set_attr(
355            path,
356            SetAttrs {
357                size: Some(size),
358                ..Default::default()
359            },
360        )
361        .await
362    }
363
364    /// Set access/modification times (utimens; `None` leaves a field untouched).
365    pub async fn set_times(
366        &self,
367        path: impl AsRef<Path>,
368        atime: Option<SetTime>,
369        mtime: Option<SetTime>,
370    ) -> Result<Metadata, ZeroFsError> {
371        self.set_attr(
372            path,
373            SetAttrs {
374                atime,
375                mtime,
376                ..Default::default()
377            },
378        )
379        .await
380    }
381
382    /// Filesystem-wide usage and limits.
383    pub async fn statfs(&self) -> Result<StatFs, ZeroFsError> {
384        self.session.check_open()?;
385        let r = self
386            .session
387            .client
388            .statfs(self.session.root_fid)
389            .await
390            .ctx("/")?;
391        Ok(StatFs::from_wire(&r))
392    }
393
394    /// Flush to durable (S3-backed) storage. On ZeroFS the server-side flush
395    /// is filesystem-global, so this is the durability endpoint for
396    /// write→rename sequences.
397    pub async fn sync(&self) -> Result<(), ZeroFsError> {
398        self.session.check_open()?;
399        self.session
400            .client
401            .fsync(self.session.root_fid, 0)
402            .await
403            .ctx("/")
404    }
405
406    /// Create a directory; the parent must exist.
407    pub async fn create_dir(
408        &self,
409        path: impl AsRef<Path>,
410        mode: u32,
411    ) -> Result<Metadata, ZeroFsError> {
412        let path = path.as_ref();
413        let pd = display(path);
414        let (dir_guard, name) = self.parent_of(path, &pd).await?;
415        self.session
416            .mkdir_at(dir_guard.fid(), name, mode, &pd)
417            .await
418    }
419
420    /// Create a directory and any missing ancestors.
421    pub async fn create_dir_all(
422        &self,
423        path: impl AsRef<Path>,
424        mode: u32,
425    ) -> Result<(), ZeroFsError> {
426        let path = path.as_ref();
427        self.session.check_open()?;
428        let names = components(path)?;
429        for depth in 1..=names.len() {
430            let prefix = &names[..depth];
431            let pd = display_path(prefix);
432            let (parents, name) = split_parent(prefix, &pd)?;
433            let (dir_guard, _) = self.session.walk(parents, &pd).await?;
434            match self
435                .session
436                .mkdir_at(dir_guard.fid(), name, mode, &pd)
437                .await
438            {
439                Ok(_) | Err(ZeroFsError::AlreadyExists { .. }) => {}
440                Err(e) => return Err(e),
441            }
442        }
443        // `AlreadyExists` was tolerated along the way; the call only succeeds if
444        // the final path resolves to a directory. Resolve symlinks (metadata,
445        // not stat) so an existing symlink-to-directory counts, as `std::fs` does.
446        if !names.is_empty() {
447            let meta = self.metadata(path).await?;
448            if !meta.is_dir() {
449                return Err(ZeroFsError::NotADirectory {
450                    path: display(path),
451                });
452            }
453        }
454        Ok(())
455    }
456
457    /// Remove a file, symlink, or device node.
458    pub async fn remove_file(&self, path: impl AsRef<Path>) -> Result<(), ZeroFsError> {
459        let path = path.as_ref();
460        let pd = display(path);
461        let (dir_guard, name) = self.parent_of(path, &pd).await?;
462        self.session
463            .client
464            .unlinkat(dir_guard.fid(), name, 0)
465            .await
466            .ctx(&pd)
467    }
468
469    /// Remove an empty directory.
470    pub async fn remove_dir(&self, path: impl AsRef<Path>) -> Result<(), ZeroFsError> {
471        let path = path.as_ref();
472        let pd = display(path);
473        let (dir_guard, name) = self.parent_of(path, &pd).await?;
474        self.session
475            .client
476            .unlinkat(dir_guard.fid(), name, libc::AT_REMOVEDIR as u32)
477            .await
478            .ctx(&pd)
479    }
480
481    /// Remove a directory and all its contents, recursively (client-side
482    /// walk, not atomic).
483    pub async fn remove_dir_all(&self, path: impl AsRef<Path>) -> Result<(), ZeroFsError> {
484        let path = path.as_ref();
485        self.session.check_open()?;
486        if components(path)?.is_empty() {
487            return Err(ZeroFsError::InvalidArgument {
488                message: "refusing to remove the attach root".to_string(),
489            });
490        }
491        let dir = self.open_dir(path).await?;
492        let result = remove_dir_contents(&dir).await;
493        dir.close().await;
494        result?;
495        self.remove_dir(path).await
496    }
497
498    /// Atomically rename/move within the filesystem; replaces an existing
499    /// target.
500    pub async fn rename(
501        &self,
502        from: impl AsRef<Path>,
503        to: impl AsRef<Path>,
504    ) -> Result<(), ZeroFsError> {
505        let (from, to) = (from.as_ref(), to.as_ref());
506        let (fd, td) = (display(from), display(to));
507        let (from_guard, from_name) = self.parent_of(from, &fd).await?;
508        let (to_guard, to_name) = self.parent_of(to, &td).await?;
509        self.session
510            .client
511            .renameat(from_guard.fid(), from_name, to_guard.fid(), to_name)
512            .await
513            .ctx(&fd)
514    }
515
516    /// Create a hard link at `link` pointing to the inode of `original`.
517    pub async fn hard_link(
518        &self,
519        original: impl AsRef<Path>,
520        link: impl AsRef<Path>,
521    ) -> Result<Metadata, ZeroFsError> {
522        let (original, link) = (original.as_ref(), link.as_ref());
523        let (od, ld) = (display(original), display(link));
524        let (dir_guard, link_name) = self.parent_of(link, &ld).await?;
525        let orig_names = components(original)?;
526        let (orig_guard, _) = self.session.walk(&orig_names, &od).await?;
527        self.session
528            .link_at(dir_guard.fid(), orig_guard.fid(), link_name, &ld)
529            .await
530    }
531
532    /// Create a symlink at `link_path` containing `target` (stored verbatim,
533    /// bytes included).
534    pub async fn symlink(
535        &self,
536        target: impl AsRef<Path>,
537        link_path: impl AsRef<Path>,
538    ) -> Result<Metadata, ZeroFsError> {
539        let link_path = link_path.as_ref();
540        let ld = display(link_path);
541        let (dir_guard, name) = self.parent_of(link_path, &ld).await?;
542        self.session
543            .symlink_at(
544                dir_guard.fid(),
545                name,
546                target.as_ref().as_os_str().as_bytes(),
547                &ld,
548            )
549            .await
550    }
551
552    /// Read a symlink target. Lossless: the target is returned byte-for-byte.
553    pub async fn read_link(&self, path: impl AsRef<Path>) -> Result<PathBuf, ZeroFsError> {
554        let path = path.as_ref();
555        let pd = display(path);
556        self.session.check_open()?;
557        let names = components(path)?;
558        let (guard, _) = self.session.walk(&names, &pd).await?;
559        let target = self.session.client.readlink(guard.fid()).await.ctx(&pd)?;
560        Ok(PathBuf::from(OsString::from_vec(target)))
561    }
562
563    /// Create a fifo, socket, or device node; `mode` carries permission bits
564    /// only; the type (and device numbers) come from `kind`.
565    pub async fn mknod(
566        &self,
567        path: impl AsRef<Path>,
568        kind: NodeKind,
569        mode: u32,
570    ) -> Result<Metadata, ZeroFsError> {
571        let path = path.as_ref();
572        let pd = display(path);
573        let (dir_guard, name) = self.parent_of(path, &pd).await?;
574        self.session
575            .mknod_at(dir_guard.fid(), name, kind, mode, &pd)
576            .await
577    }
578
579    /// List a whole directory (`.`/`..` excluded); metadata inline when the
580    /// server supports readdirplus.
581    pub async fn read_dir(&self, path: impl AsRef<Path>) -> Result<Vec<DirEntry>, ZeroFsError> {
582        let dir = self.open_dir(path).await?;
583        let mut out = Vec::new();
584        let result = loop {
585            match dir.next_batch(None).await {
586                Ok(batch) if batch.is_empty() => break Ok(out),
587                Ok(batch) => out.extend(batch),
588                Err(e) => break Err(e),
589            }
590        };
591        dir.close().await;
592        result
593    }
594
595    /// Open a directory for incremental listing and byte-exact child
596    /// operations.
597    pub async fn open_dir(&self, path: impl AsRef<Path>) -> Result<Arc<Dir>, ZeroFsError> {
598        let path = path.as_ref();
599        let pd = display(path);
600        self.session.check_open()?;
601        let names = components(path)?;
602        let (guard, stat) = self.session.walk(&names, &pd).await?;
603        if let Some(stat) = &stat
604            && FileType::from_mode(stat.mode) != FileType::Dir
605        {
606            return Err(ZeroFsError::NotADirectory { path: pd });
607        }
608        Ok(Dir::new(
609            Arc::clone(&self.session),
610            guard,
611            display_path(&names),
612        ))
613    }
614
615    /// Open (and optionally create) a file for positioned I/O.
616    pub async fn open(
617        &self,
618        path: impl AsRef<Path>,
619        opts: OpenOptions,
620    ) -> Result<Arc<File>, ZeroFsError> {
621        let path = path.as_ref();
622        let pd = display(path);
623        self.session.check_open()?;
624        let guard = self.open_relative_path(path, &pd, &opts).await?;
625        Ok(File::new(Arc::clone(&self.session), guard, pd))
626    }
627
628    /// Shorthand: open read-write with create+truncate, mode 0o644.
629    pub async fn create(&self, path: impl AsRef<Path>) -> Result<Arc<File>, ZeroFsError> {
630        self.open(path, OpenOptions::read_write().create(true).truncate(true))
631            .await
632    }
633
634    /// Resolve symlinks in `path` (final and intermediate), returning the
635    /// canonical components and the final stat. Relative targets resolve
636    /// against the link's parent, absolute targets against the attach root
637    /// (the client cannot see outside its attach).
638    async fn resolve(&self, path: &Path, pd: &str) -> Result<(Vec<Vec<u8>>, Stat), ZeroFsError> {
639        let session = &self.session;
640
641        // Fast path: the literal walk succeeds and the final node is not a
642        // symlink; done in one round trip. Any failure falls back to the
643        // component-wise resolver to find the offending symlink.
644        let literal: Vec<&[u8]> = components(path)?;
645        if let Ok((_guard, stat)) = session.walk_stat_from(session.root_fid, &literal, pd).await
646            && FileType::from_mode(stat.mode) != FileType::Symlink
647        {
648            return Ok((literal.iter().map(|c| c.to_vec()).collect(), stat));
649        }
650
651        let mut todo: VecDeque<Vec<u8>> = literal.iter().map(|c| c.to_vec()).collect();
652        let mut stack: Vec<Vec<u8>> = Vec::new();
653        let mut hops = 0u32;
654        // A fid pinned at the directory `stack` denotes, advanced one
655        // component at a time.
656        let (mut cur, _) = session.walk(&[], pd).await?;
657        let mut cur_stat = session.stat_fid(cur.fid(), pd).await?;
658
659        while let Some(name) = todo.pop_front() {
660            if name == b".." {
661                // Only reachable via a symlink target; resolve against the
662                // canonical stack ("/.." stays at the root, like POSIX).
663                stack.pop();
664                let refs: Vec<&[u8]> = stack.iter().map(|c| c.as_slice()).collect();
665                let (guard, stat) = session.walk_stat_from(session.root_fid, &refs, pd).await?;
666                cur = guard;
667                cur_stat = stat;
668                continue;
669            }
670
671            let (guard, stat) = session
672                .walk_stat_from(cur.fid(), &[name.as_slice()], pd)
673                .await?;
674            if FileType::from_mode(stat.mode) == FileType::Symlink {
675                hops += 1;
676                if hops > MAX_SYMLINK_HOPS {
677                    return Err(ZeroFsError::TooManySymlinks {
678                        path: pd.to_string(),
679                    });
680                }
681                let target = session.client.readlink(guard.fid()).await.ctx(pd)?;
682                if target.first() == Some(&b'/') {
683                    stack.clear();
684                    let (root_clone, _) = session.walk(&[], pd).await?;
685                    cur = root_clone;
686                    cur_stat = session.stat_fid(cur.fid(), pd).await?;
687                }
688                // Prepend the target's components ahead of the remaining path.
689                for comp in target
690                    .split(|&b| b == b'/')
691                    .filter(|c| !c.is_empty() && *c != b".")
692                    .rev()
693                {
694                    todo.push_front(comp.to_vec());
695                }
696            } else {
697                stack.push(name);
698                cur = guard;
699                cur_stat = stat;
700            }
701        }
702
703        Ok((stack, cur_stat))
704    }
705}
706
707/// Empty a directory recursively: lists in rounds (rewinding between them so
708/// deletion never races the cursor) until nothing is left.
709fn remove_dir_contents<'a>(
710    dir: &'a Dir,
711) -> std::pin::Pin<Box<dyn Future<Output = Result<(), ZeroFsError>> + Send + 'a>> {
712    Box::pin(async move {
713        loop {
714            dir.rewind().await?;
715            let batch = dir.next_batch(None).await?;
716            if batch.is_empty() {
717                return Ok(());
718            }
719            for entry in batch {
720                if entry.file_type == FileType::Dir {
721                    let child = dir.open_dir_at(&entry.name_bytes).await?;
722                    let result = remove_dir_contents(&child).await;
723                    child.close().await;
724                    result?;
725                    dir.remove_dir_at(&entry.name_bytes).await?;
726                } else {
727                    dir.remove_file_at(&entry.name_bytes).await?;
728                }
729            }
730        }
731    })
732}
733
734/// Open a transport to `target`. Mirrors the FUSE mount's target grammar.
735async fn dial(target: &str, msize: u32) -> Result<Arc<NinePClient>, ZeroFsError> {
736    let connect_failed = |message: String| ZeroFsError::ConnectFailed { message };
737
738    if let Some(rest) = target.strip_prefix("unix:") {
739        let path = rest.strip_prefix("//").unwrap_or(rest);
740        return NinePClient::connect_unix(path, msize)
741            .await
742            .map_err(|e| connect_failed(format!("9P unix socket {path}: {e}")));
743    }
744
745    let hostport = target.strip_prefix("tcp://").unwrap_or(target);
746
747    // A path-like target without a scheme is treated as a unix socket.
748    if hostport.starts_with('/') || hostport.starts_with('.') {
749        return NinePClient::connect_unix(hostport, msize)
750            .await
751            .map_err(|e| connect_failed(format!("9P unix socket {hostport}: {e}")));
752    }
753
754    let addr = resolve_addr(hostport).await?;
755    NinePClient::connect_tcp(addr, msize)
756        .await
757        .map_err(|e| connect_failed(format!("9P server {addr}: {e}")))
758}
759
760async fn resolve_addr(s: &str) -> Result<std::net::SocketAddr, ZeroFsError> {
761    if let Ok(addr) = s.parse::<std::net::SocketAddr>() {
762        return Ok(addr);
763    }
764    let with_port = if s.contains(':') {
765        s.to_string()
766    } else {
767        format!("{s}:{DEFAULT_9P_PORT}")
768    };
769    tokio::net::lookup_host(&with_port)
770        .await
771        .map_err(|e| ZeroFsError::ConnectFailed {
772            message: format!("resolving {with_port}: {e}"),
773        })?
774        .next()
775        .ok_or_else(|| ZeroFsError::ConnectFailed {
776            message: format!("no addresses resolved for {with_port}"),
777        })
778}