Skip to main content

libfuse_fs/unionfs/
mod.rs

1// Copyright (C) 2023 Ant Group. All rights reserved.
2//  2024 From [fuse_backend_rs](https://github.com/cloud-hypervisor/fuse-backend-rs)
3// SPDX-License-Identifier: Apache-2.0
4
5#![allow(missing_docs)]
6mod async_io;
7pub mod config;
8mod inode_store;
9pub mod layer;
10mod utils;
11
12//mod tempfile;
13use core::panic;
14use std::collections::HashMap;
15use std::ffi::{OsStr, OsString};
16use std::future::Future;
17use std::io::{Error, Result};
18use std::path::Path;
19
20use config::Config;
21use futures::StreamExt as _;
22use rfuse3::raw::reply::{
23    DirectoryEntry, DirectoryEntryPlus, ReplyAttr, ReplyEntry, ReplyOpen, ReplyStatFs,
24};
25use rfuse3::raw::{Request, Session};
26use std::sync::{Arc, Weak};
27use tracing::debug;
28use tracing::error;
29use tracing::info;
30use tracing::trace;
31
32use rfuse3::{Errno, FileType, MountOptions, mode_from_kind_and_perm};
33const SLASH_ASCII: char = '/';
34use futures::future::join_all;
35use futures::stream::iter;
36
37use crate::passthrough::{PassthroughArgs, new_passthroughfs_layer};
38use crate::util::convert_stat64_to_file_attr;
39use inode_store::InodeStore;
40use layer::Layer;
41use rfuse3::raw::logfs::LoggingFileSystem;
42use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
43
44use tokio::sync::{Mutex, RwLock};
45
46pub type Inode = u64;
47pub type Handle = u64;
48
49pub(crate) type BoxedLayer = dyn Layer;
50//type BoxedFileSystem = Box<dyn FileSystem<Inode = Inode, Handle = Handle> + Send + Sync>;
51const INODE_ALLOC_BATCH: u64 = 0x1_0000_0000;
52// RealInode represents one inode object in specific layer.
53// Also, each RealInode maps to one Entry, which should be 'forgotten' after drop.
54// Important note: do not impl Clone trait for it or refcount will be messed up.
55pub(crate) struct RealInode {
56    pub layer: Arc<BoxedLayer>,
57    pub in_upper_layer: bool,
58    pub inode: u64,
59    // File is whiteouted, we need to hide it.
60    pub whiteout: bool,
61    // Directory is opaque, we need to hide all entries inside it.
62    pub opaque: bool,
63    pub stat: Option<ReplyAttr>,
64}
65
66// OverlayInode must be protected by lock, it can be operated by multiple threads.
67// #[derive(Default)]
68pub(crate) struct OverlayInode {
69    // Inode hash table, map from 'name' to 'OverlayInode'.
70    pub childrens: Mutex<HashMap<String, Arc<OverlayInode>>>,
71    pub parent: Mutex<Weak<OverlayInode>>,
72    // Backend inodes from all layers.
73    pub real_inodes: Mutex<Vec<Arc<RealInode>>>,
74    // Inode number.
75    pub inode: u64,
76    pub path: RwLock<String>,
77    pub name: RwLock<String>,
78    pub lookups: AtomicU64,
79    // Node is whiteout-ed.
80    pub whiteout: AtomicBool,
81    // Directory is loaded.
82    pub loaded: AtomicBool,
83}
84
85#[derive(Default)]
86pub enum CachePolicy {
87    Never,
88    #[default]
89    Auto,
90    Always,
91}
92pub struct OverlayFs {
93    config: Config,
94    lower_layers: Vec<Arc<BoxedLayer>>,
95    upper_layer: Option<Arc<BoxedLayer>>,
96    // All inodes in FS.
97    inodes: RwLock<InodeStore>,
98    // Open file handles.
99    handles: Mutex<HashMap<u64, Arc<HandleData>>>,
100    next_handle: AtomicU64,
101    writeback: AtomicBool,
102    no_open: AtomicBool,
103    no_opendir: AtomicBool,
104    killpriv_v2: AtomicBool,
105    perfile_dax: AtomicBool,
106    root_inodes: u64,
107}
108
109// This is a wrapper of one inode in specific layer, It can't impl Clone trait.
110struct RealHandle {
111    layer: Arc<BoxedLayer>,
112    in_upper_layer: bool,
113    inode: u64,
114    handle: AtomicU64,
115}
116
117struct HandleData {
118    node: Arc<OverlayInode>,
119    //offset: libc::off_t,
120    real_handle: Option<RealHandle>,
121    // Cache the directory entries for stable readdir offsets.
122    // The snapshot contains all necessary info to avoid re-accessing childrens map.
123    dir_snapshot: Mutex<Option<Vec<DirectoryEntryPlus>>>,
124}
125
126// RealInode is a wrapper of one inode in specific layer.
127// All layer operations returning Entry should be wrapped in RealInode implementation
128// so that we can increase the refcount(lookup count) of each inode and decrease it after Drop.
129// Important: do not impl 'Copy' trait for it or refcount will be messed up.
130impl RealInode {
131    async fn new(
132        layer: Arc<BoxedLayer>,
133        in_upper_layer: bool,
134        inode: u64,
135        whiteout: bool,
136        opaque: bool,
137    ) -> Self {
138        let mut ri = RealInode {
139            layer,
140            in_upper_layer,
141            inode,
142            whiteout,
143            opaque,
144            stat: None,
145        };
146        match ri.stat64_ignore_enoent(&Request::default()).await {
147            Ok(v) => {
148                ri.stat = v;
149            }
150            Err(e) => {
151                error!("stat64 failed during RealInode creation: {e}");
152            }
153        }
154        ri
155    }
156
157    async fn stat64(&self, req: &Request) -> Result<ReplyAttr> {
158        let layer = self.layer.as_ref();
159        if self.inode == 0 {
160            return Err(Error::from_raw_os_error(libc::ENOENT));
161        }
162        // trace!("stat64: trying to getattr req: {:?}", req);
163        layer
164            .getattr(*req, self.inode, None, 0)
165            .await
166            .map_err(|e| e.into())
167    }
168
169    async fn stat64_ignore_enoent(&self, req: &Request) -> Result<Option<ReplyAttr>> {
170        match self.stat64(req).await {
171            Ok(v1) => Ok(Some(v1)),
172            Err(e) => match e.raw_os_error() {
173                Some(raw_error) => {
174                    if raw_error == libc::ENOENT
175                        || raw_error == libc::ENAMETOOLONG
176                        || raw_error == libc::ESTALE
177                    {
178                        return Ok(None);
179                    }
180                    Err(e)
181                }
182                None => Err(e),
183            },
184        }
185    }
186
187    // Do real lookup action in specific layer, this call will increase Entry refcount which must be released later.
188    async fn lookup_child_ignore_enoent(
189        &self,
190        ctx: Request,
191        name: &str,
192    ) -> Result<Option<ReplyEntry>> {
193        let cname = OsStr::new(name);
194        // Real inode must have a layer.
195        let layer = self.layer.as_ref();
196        match layer.lookup(ctx, self.inode, cname).await {
197            Ok(v) => {
198                // Negative entry also indicates missing entry.
199                if v.attr.ino == 0 {
200                    return Ok(None);
201                }
202                Ok(Some(v))
203            }
204            Err(e) => {
205                let ioerror: std::io::Error = e.into();
206                if let Some(raw_error) = ioerror.raw_os_error()
207                    && (raw_error == libc::ENOENT || raw_error == libc::ENAMETOOLONG)
208                {
209                    return Ok(None);
210                }
211
212                Err(e.into())
213            }
214        }
215    }
216
217    // Find child inode in same layer under this directory(Self).
218    // Return None if not found.
219    async fn lookup_child(&self, ctx: Request, name: &str) -> Result<Option<RealInode>> {
220        if self.whiteout {
221            return Ok(None);
222        }
223
224        let layer = self.layer.as_ref();
225
226        // Find child Entry with <name> under directory with inode <self.inode>.
227        match self.lookup_child_ignore_enoent(ctx, name).await? {
228            Some(v) => {
229                // The Entry must be forgotten in each layer, which will be done automatically by Drop operation.
230                let (whiteout, opaque) = if v.attr.kind == FileType::Directory {
231                    (false, layer.is_opaque(ctx, v.attr.ino).await?)
232                } else {
233                    (layer.is_whiteout(ctx, v.attr.ino).await?, false)
234                };
235
236                Ok(Some(RealInode {
237                    layer: self.layer.clone(),
238                    in_upper_layer: self.in_upper_layer,
239                    inode: v.attr.ino,
240                    whiteout,
241                    opaque,
242                    stat: Some(ReplyAttr {
243                        ttl: v.ttl,
244                        attr: v.attr,
245                    }),
246                }))
247            }
248            None => Ok(None),
249        }
250    }
251
252    // Read directory entries from specific RealInode, error out if it's not directory.
253    async fn readdir(&self, ctx: Request) -> Result<HashMap<String, RealInode>> {
254        // Deleted inode should not be read.
255        if self.whiteout {
256            return Err(Error::from_raw_os_error(libc::ENOENT));
257        }
258        // trace!("readdir: before stat");
259        let stat = match self.stat.clone() {
260            Some(v) => v,
261            None => self.stat64(&ctx).await?,
262        };
263
264        // Must be directory.
265        if stat.attr.kind != FileType::Directory {
266            return Err(Error::from_raw_os_error(libc::ENOTDIR));
267        }
268
269        // Open the directory and load each entry.
270        let opendir_res = self
271            .layer
272            .opendir(ctx, self.inode, libc::O_RDONLY as u32)
273            .await;
274        // trace!("readdir: after opendir");
275        let handle = match opendir_res {
276            Ok(handle) => handle,
277
278            // opendir may not be supported if no_opendir is set, so we can ignore this error.
279            Err(e) => {
280                let ioerror: std::io::Error = e.into();
281                match ioerror.raw_os_error() {
282                    Some(raw_error) => {
283                        if raw_error == libc::ENOSYS {
284                            // We can still call readdir with inode if opendir is not supported in this layer.
285                            ReplyOpen { fh: 0, flags: 0 }
286                        } else {
287                            return Err(e.into());
288                        }
289                    }
290                    None => {
291                        return Err(e.into());
292                    }
293                }
294            }
295        };
296
297        let child_names = self.layer.readdir(ctx, self.inode, handle.fh, 0).await?;
298        // Non-zero handle indicates successful 'open', we should 'release' it.
299        if handle.fh > 0 {
300            self.layer
301                .releasedir(ctx, self.inode, handle.fh, handle.flags)
302                .await?
303            //DIFF
304        }
305
306        // Lookup all child and construct "RealInode"s.
307        let child_real_inodes = Arc::new(Mutex::new(HashMap::new()));
308        // trace!("readdir: before iter childrens");
309        let a_map = child_names.entries.map(|entery| async {
310            match entery {
311                Ok(dire) => {
312                    let dname = dire.name.into_string().unwrap();
313                    if dname == "." || dname == ".." {
314                        // Skip . and .. entries.
315                        return Ok(());
316                    }
317                    // trace!("readdir: before lookup child: dname={}", dname);
318                    if let Some(child) = self.lookup_child(ctx, &dname).await? {
319                        child_real_inodes.lock().await.insert(dname, child);
320                    }
321                    Ok(())
322                }
323                Err(err) => Err(err),
324            }
325        });
326        let k = join_all(a_map.collect::<Vec<_>>().await).await;
327        drop(k);
328        // Now into_inner func is safety.
329        let re = Arc::try_unwrap(child_real_inodes)
330            .map_err(|_| Errno::new_not_exist())?
331            .into_inner();
332        // trace!("readdir: return");
333        Ok(re)
334    }
335
336    async fn create_whiteout(&self, ctx: Request, name: &str) -> Result<RealInode> {
337        if !self.in_upper_layer {
338            return Err(Error::from_raw_os_error(libc::EROFS));
339        }
340
341        // from &str to &OsStr
342        let name_osstr = OsStr::new(name);
343        let entry = self
344            .layer
345            .create_whiteout(ctx, self.inode, name_osstr)
346            .await?;
347
348        // Wrap whiteout to RealInode.
349        Ok(RealInode {
350            layer: self.layer.clone(),
351            in_upper_layer: true,
352            inode: entry.attr.ino,
353            whiteout: true,
354            opaque: false,
355            stat: Some(ReplyAttr {
356                ttl: entry.ttl,
357                attr: entry.attr,
358            }),
359        })
360    }
361
362    async fn mkdir(&self, ctx: Request, name: &str, mode: u32, umask: u32) -> Result<RealInode> {
363        if !self.in_upper_layer {
364            return Err(Error::from_raw_os_error(libc::EROFS));
365        }
366
367        let name_osstr = OsStr::new(name);
368        let entry = self
369            .layer
370            .mkdir(ctx, self.inode, name_osstr, mode, umask)
371            .await?;
372
373        // update node's first_layer
374        Ok(RealInode {
375            layer: self.layer.clone(),
376            in_upper_layer: true,
377            inode: entry.attr.ino,
378            whiteout: false,
379            opaque: false,
380            stat: Some(ReplyAttr {
381                ttl: entry.ttl,
382                attr: entry.attr,
383            }),
384        })
385    }
386
387    async fn create(
388        &self,
389        ctx: Request,
390        name: &str,
391        mode: u32,
392        flags: u32,
393    ) -> Result<(RealInode, Option<u64>)> {
394        if !self.in_upper_layer {
395            return Err(Error::from_raw_os_error(libc::EROFS));
396        }
397        let name = OsStr::new(name);
398        let create_rep = self
399            .layer
400            .create(ctx, self.inode, name, mode, flags)
401            .await?;
402
403        Ok((
404            RealInode {
405                layer: self.layer.clone(),
406                in_upper_layer: true,
407                inode: create_rep.attr.ino,
408                whiteout: false,
409                opaque: false,
410                stat: Some(ReplyAttr {
411                    ttl: create_rep.ttl,
412                    attr: create_rep.attr,
413                }),
414            },
415            Some(create_rep.fh),
416        ))
417    }
418
419    async fn mknod(
420        &self,
421        ctx: Request,
422        name: &str,
423        mode: u32,
424        rdev: u32,
425        _umask: u32,
426    ) -> Result<RealInode> {
427        if !self.in_upper_layer {
428            return Err(Error::from_raw_os_error(libc::EROFS));
429        }
430        let name = OsStr::new(name);
431        let rep = self.layer.mknod(ctx, self.inode, name, mode, rdev).await?;
432        Ok(RealInode {
433            layer: self.layer.clone(),
434            in_upper_layer: true,
435            inode: rep.attr.ino,
436            whiteout: false,
437            opaque: false,
438            stat: Some(ReplyAttr {
439                ttl: rep.ttl,
440                attr: rep.attr,
441            }),
442        })
443    }
444
445    async fn link(&self, ctx: Request, ino: u64, name: &str) -> Result<RealInode> {
446        if !self.in_upper_layer {
447            return Err(Error::from_raw_os_error(libc::EROFS));
448        }
449        let name = OsStr::new(name);
450        let entry = self.layer.link(ctx, ino, self.inode, name).await?;
451
452        let opaque = if utils::is_dir(&entry.attr.kind) {
453            self.layer.is_opaque(ctx, entry.attr.ino).await?
454        } else {
455            false
456        };
457        Ok(RealInode {
458            layer: self.layer.clone(),
459            in_upper_layer: true,
460            inode: entry.attr.ino,
461            whiteout: false,
462            opaque,
463            stat: Some(ReplyAttr {
464                ttl: entry.ttl,
465                attr: entry.attr,
466            }),
467        })
468    }
469
470    // Create a symlink in self directory.
471    async fn symlink(&self, ctx: Request, link_name: &str, filename: &str) -> Result<RealInode> {
472        if !self.in_upper_layer {
473            return Err(Error::from_raw_os_error(libc::EROFS));
474        }
475        let link_name = OsStr::new(link_name);
476        let filename = OsStr::new(filename);
477        let entry = self
478            .layer
479            .symlink(ctx, self.inode, filename, link_name)
480            .await?;
481
482        Ok(RealInode {
483            layer: self.layer.clone(),
484            in_upper_layer: true,
485            inode: entry.attr.ino,
486            whiteout: false,
487            opaque: false,
488            stat: Some(ReplyAttr {
489                ttl: entry.ttl,
490                attr: entry.attr,
491            }),
492        })
493    }
494}
495
496impl Drop for RealInode {
497    fn drop(&mut self) {
498        let layer = Arc::clone(&self.layer);
499        let inode = self.inode;
500        tokio::spawn(async move {
501            let ctx = Request::default();
502            layer.forget(ctx, inode, 1).await;
503        });
504    }
505}
506
507impl OverlayInode {
508    pub fn new() -> Self {
509        Self {
510            childrens: Mutex::new(HashMap::new()),
511            parent: Mutex::new(Weak::new()),
512            real_inodes: Mutex::new(vec![]),
513            inode: 0,
514            path: RwLock::new(String::new()),
515            name: RwLock::new(String::new()),
516            lookups: AtomicU64::new(0),
517            whiteout: AtomicBool::new(false),
518            loaded: AtomicBool::new(false),
519        }
520    }
521    // Allocate new OverlayInode based on one RealInode,
522    // inode number is always 0 since only OverlayFs has global unique inode allocator.
523    pub async fn new_from_real_inode(
524        name: &str,
525        ino: u64,
526        path: String,
527        real_inode: RealInode,
528    ) -> Self {
529        let mut new = OverlayInode::new();
530        new.inode = ino;
531        new.path = path.into();
532        new.name = name.to_string().into();
533        new.whiteout.store(real_inode.whiteout, Ordering::Relaxed);
534        new.lookups = AtomicU64::new(1);
535        new.real_inodes = Mutex::new(vec![real_inode.into()]);
536        new
537    }
538
539    pub async fn new_from_real_inodes(
540        name: &str,
541        ino: u64,
542        path: String,
543        real_inodes: Vec<RealInode>,
544    ) -> Result<Self> {
545        if real_inodes.is_empty() {
546            error!("BUG: new_from_real_inodes() called with empty real_inodes");
547            return Err(Error::from_raw_os_error(libc::EINVAL));
548        }
549
550        let mut first = true;
551        let mut new = Self::new();
552        for ri in real_inodes {
553            let whiteout = ri.whiteout;
554            let opaque = ri.opaque;
555            let stat = match &ri.stat {
556                Some(v) => v.clone(),
557                None => ri.stat64(&Request::default()).await?,
558            };
559
560            if first {
561                first = false;
562                new = Self::new_from_real_inode(name, ino, path.clone(), ri).await;
563
564                // This is whiteout, no need to check lower layers.
565                if whiteout {
566                    break;
567                }
568
569                // A non-directory file shadows all lower layers as default.
570                if !utils::is_dir(&stat.attr.kind) {
571                    break;
572                }
573
574                // Opaque directory shadows all lower layers.
575                if opaque {
576                    break;
577                }
578            } else {
579                // This is whiteout, no need to record this, break directly.
580                if ri.whiteout {
581                    break;
582                }
583
584                // Only directory have multiple real inodes, so if this is non-first real-inode
585                // and it's not directory, it should indicates some invalid layout. @weizhang555
586                if !utils::is_dir(&stat.attr.kind) {
587                    error!("invalid layout: non-directory has multiple real inodes");
588                    break;
589                }
590
591                // Valid directory.
592                new.real_inodes.lock().await.push(ri.into());
593                // Opaque directory shadows all lower layers.
594                if opaque {
595                    break;
596                }
597            }
598        }
599        Ok(new)
600    }
601
602    pub async fn stat64(&self, ctx: Request) -> Result<ReplyAttr> {
603        // try layers in order or just take stat from first layer?
604        for l in self.real_inodes.lock().await.iter() {
605            if let Some(v) = l.stat64_ignore_enoent(&ctx).await? {
606                return Ok(v);
607            }
608        }
609
610        // not in any layer
611        Err(Error::from_raw_os_error(libc::ENOENT))
612    }
613
614    pub async fn is_dir(&self, ctx: Request) -> Result<bool> {
615        let st = self.stat64(ctx).await?;
616        Ok(utils::is_dir(&st.attr.kind))
617    }
618
619    pub async fn count_entries_and_whiteout(&self, ctx: Request) -> Result<(u64, u64)> {
620        let mut count = 0;
621        let mut whiteouts = 0;
622
623        let st = self.stat64(ctx).await?;
624
625        // must be directory
626        if !utils::is_dir(&st.attr.kind) {
627            return Err(Error::from_raw_os_error(libc::ENOTDIR));
628        }
629
630        for (_, child) in self.childrens.lock().await.iter() {
631            if child.whiteout.load(Ordering::Relaxed) {
632                whiteouts += 1;
633            } else {
634                count += 1;
635            }
636        }
637        Ok((count, whiteouts))
638    }
639
640    pub async fn open(
641        &self,
642        ctx: Request,
643        flags: u32,
644        _fuse_flags: u32,
645    ) -> Result<(Arc<BoxedLayer>, ReplyOpen)> {
646        let (layer, _, inode) = self.first_layer_inode().await;
647        let ro = layer.as_ref().open(ctx, inode, flags).await?;
648        Ok((layer, ro))
649    }
650
651    // Self is directory, fill all childrens.
652    pub async fn scan_childrens(self: &Arc<Self>, ctx: Request) -> Result<Vec<OverlayInode>> {
653        let st = self.stat64(ctx).await?;
654        if !utils::is_dir(&st.attr.kind) {
655            return Err(Error::from_raw_os_error(libc::ENOTDIR));
656        }
657
658        let mut all_layer_inodes: HashMap<String, Vec<RealInode>> = HashMap::new();
659        // read out directories from each layer
660        // Scan from upper layer to lower layer.
661        for ri in self.real_inodes.lock().await.iter() {
662            if ri.whiteout {
663                // Node is deleted from some upper layer, skip it.
664                debug!("directory is whiteout");
665                break;
666            }
667
668            let stat = match &ri.stat {
669                Some(v) => v.clone(),
670                None => ri.stat64(&ctx).await?,
671            };
672
673            if !utils::is_dir(&stat.attr.kind) {
674                debug!("{} is not a directory", self.path.read().await);
675                // not directory
676                break;
677            }
678
679            // Read all entries from one layer.
680            let entries: HashMap<String, RealInode> = ri.readdir(ctx).await?;
681
682            // Merge entries from one layer to all_layer_inodes.
683            for (name, inode) in entries {
684                match all_layer_inodes.get_mut(&name) {
685                    Some(v) => {
686                        // Append additional RealInode to the end of vector.
687                        v.push(inode)
688                    }
689                    None => {
690                        all_layer_inodes.insert(name, vec![inode]);
691                    }
692                }
693            }
694
695            // if opaque, stop here
696            if ri.opaque {
697                debug!("directory {} is opaque", self.path.read().await);
698                break;
699            }
700        }
701
702        // Construct OverlayInode for each entry.
703        let mut childrens = vec![];
704        for (name, real_inodes) in all_layer_inodes {
705            // Inode numbers are not allocated yet.
706            let path = format!("{}/{}", self.path.read().await, name);
707            let new = Self::new_from_real_inodes(name.as_str(), 0, path, real_inodes).await?;
708            childrens.push(new);
709        }
710
711        Ok(childrens)
712    }
713
714    /// Create a new directory in upper layer for node, node must be directory.
715    ///
716    /// Recursively ensures a directory path exists in the upper layer.
717    ///
718    /// This function is a critical part of the copy-up process. When a file or directory
719    /// needs to be copied up, this function is called on its parent to ensure the entire
720    /// directory hierarchy exists in the upper layer first. It works recursively:
721    /// 1. If the current directory is already in the upper layer, it does nothing.
722    /// 2. If not, it first calls itself on its own parent directory.
723    /// 3. Once the parent is guaranteed to be in the upper layer, it creates the current
724    ///    directory within the parent's upper-layer representation.
725    ///
726    /// Crucially, it preserves the original directory's ownership (UID/GID) and permissions
727    /// by using [`getattr_with_mapping`][crate::unionfs::layer::Layer::getattr_with_mapping] and
728    /// [`mkdir_with_context`][crate::unionfs::layer::Layer::mkdir_with_context] with [`OperationContext`][crate::context::OperationContext].
729    pub async fn create_upper_dir(
730        self: Arc<Self>,
731        ctx: Request,
732        mode_umask: Option<(u32, u32)>,
733    ) -> Result<()> {
734        // To preserve original ownership, we must get the raw, unmapped host attributes.
735        // We achieve this by calling `do_getattr_helper`, which is specifically designed
736        // to bypass the ID mapping logic. This is safe and does not affect other
737        // functionalities because `do_getattr_helper` and the standard `stat64()` call
738        // both rely on the same underlying `stat` system call; they only differ in
739        // whether the resulting `uid` and `gid` are mapped.
740        let (self_layer, _, self_inode) = self.first_layer_inode().await;
741        let re = self_layer
742            .getattr_with_mapping(self_inode, None, false)
743            .await?;
744        let st = ReplyAttr {
745            ttl: re.1,
746            attr: convert_stat64_to_file_attr(re.0),
747        };
748        if !utils::is_dir(&st.attr.kind) {
749            return Err(Error::from_raw_os_error(libc::ENOTDIR));
750        }
751
752        // If node already has upper layer, we can just return here.
753        if self.in_upper_layer().await {
754            return Ok(());
755        }
756
757        // not in upper layer, check parent.
758        let pnode = if let Some(n) = self.parent.lock().await.upgrade() {
759            Arc::clone(&n)
760        } else {
761            return Err(Error::other("no parent?"));
762        };
763
764        if !pnode.in_upper_layer().await {
765            Box::pin(pnode.clone().create_upper_dir(ctx, None)).await?; // recursive call
766        }
767        let child: Arc<Mutex<Option<RealInode>>> = Arc::new(Mutex::new(None));
768        let c_name = self.name.read().await.clone();
769        let _ = pnode
770            .handle_upper_inode_locked(&mut |parent_upper_inode: Option<Arc<RealInode>>| async {
771                match parent_upper_inode {
772                    Some(parent_ri) => {
773                        let ri = match mode_umask {
774                            // We manually unfold the `mkdir` logic here instead of calling the `mkdir` method directly.
775                            // This is necessary to preserve the original directory's UID and GID during the copy-up process.
776                            Some((mode, umask)) => {
777                                if !parent_ri.in_upper_layer {
778                                    return Err(Error::from_raw_os_error(libc::EROFS));
779                                }
780                                let name_osstr = OsStr::new(&c_name);
781                                let op_ctx = crate::context::OperationContext::with_credentials(
782                                    ctx,
783                                    st.attr.uid,
784                                    st.attr.gid,
785                                );
786                                let entry = parent_ri
787                                    .layer
788                                    .mkdir_with_context(
789                                        op_ctx,
790                                        parent_ri.inode,
791                                        name_osstr,
792                                        mode,
793                                        umask,
794                                    )
795                                    .await?;
796                                RealInode {
797                                    layer: parent_ri.layer.clone(),
798                                    in_upper_layer: true,
799                                    inode: entry.attr.ino,
800                                    whiteout: false,
801                                    opaque: false,
802                                    stat: Some(ReplyAttr {
803                                        ttl: entry.ttl,
804                                        attr: entry.attr,
805                                    }),
806                                }
807                            }
808                            None => {
809                                if !parent_ri.in_upper_layer {
810                                    return Err(Error::from_raw_os_error(libc::EROFS));
811                                }
812                                let name_osstr = OsStr::new(&c_name);
813                                let op_ctx = crate::context::OperationContext::with_credentials(
814                                    ctx,
815                                    st.attr.uid,
816                                    st.attr.gid,
817                                );
818                                let entry = parent_ri
819                                    .layer
820                                    .mkdir_with_context(
821                                        op_ctx,
822                                        parent_ri.inode,
823                                        name_osstr,
824                                        mode_from_kind_and_perm(st.attr.kind, st.attr.perm),
825                                        0,
826                                    )
827                                    .await?;
828                                RealInode {
829                                    layer: parent_ri.layer.clone(),
830                                    in_upper_layer: true,
831                                    inode: entry.attr.ino,
832                                    whiteout: false,
833                                    opaque: false,
834                                    stat: Some(ReplyAttr {
835                                        ttl: entry.ttl,
836                                        attr: entry.attr,
837                                    }),
838                                }
839                            }
840                        };
841                        // create directory here
842                        child.lock().await.replace(ri);
843                    }
844                    None => {
845                        error!(
846                            "BUG: parent {} has no upper inode after create_upper_dir",
847                            pnode.inode
848                        );
849                        return Err(Error::from_raw_os_error(libc::EINVAL));
850                    }
851                }
852                Ok(false)
853            })
854            .await?;
855
856        if let Some(ri) = child.lock().await.take() {
857            // Push the new real inode to the front of vector.
858            self.add_upper_inode(ri, false).await;
859        }
860
861        Ok(())
862    }
863
864    // Add new upper RealInode to OverlayInode, clear all lower RealInodes if 'clear_lowers' is true.
865    async fn add_upper_inode(self: &Arc<Self>, ri: RealInode, clear_lowers: bool) {
866        let mut inodes = self.real_inodes.lock().await;
867        // Update self according to upper attribute.
868        self.whiteout.store(ri.whiteout, Ordering::Relaxed);
869
870        // Push the new real inode to the front of vector.
871        let mut new = vec![Arc::new(ri)];
872        // Drain lower RealInodes.
873        let lowers = inodes.drain(..).collect::<Vec<Arc<RealInode>>>();
874        if !clear_lowers {
875            // If not clear lowers, append them to the end of vector.
876            new.extend(lowers);
877        }
878        inodes.extend(new);
879    }
880
881    // return the uppder layer fs.
882    pub async fn in_upper_layer(&self) -> bool {
883        let all_inodes = self.real_inodes.lock().await;
884        let first = all_inodes.first();
885        match first {
886            Some(v) => v.in_upper_layer,
887            None => false,
888        }
889    }
890
891    pub async fn upper_layer_only(&self) -> bool {
892        let real_inodes = self.real_inodes.lock().await;
893        let first = real_inodes.first();
894        match first {
895            Some(v) => {
896                if !v.in_upper_layer {
897                    false
898                } else {
899                    real_inodes.len() == 1
900                }
901            }
902            None => false,
903        }
904    }
905
906    pub async fn first_layer_inode(&self) -> (Arc<BoxedLayer>, bool, u64) {
907        let all_inodes = self.real_inodes.lock().await;
908        let first = all_inodes.first();
909        match first {
910            Some(v) => (v.layer.clone(), v.in_upper_layer, v.inode),
911            None => panic!("BUG: dangling OverlayInode"),
912        }
913    }
914
915    pub async fn child(&self, name: &str) -> Option<Arc<OverlayInode>> {
916        self.childrens.lock().await.get(name).cloned()
917    }
918
919    pub async fn remove_child(&self, name: &str) -> Option<Arc<OverlayInode>> {
920        self.childrens.lock().await.remove(name)
921    }
922
923    pub async fn insert_child(&self, name: &str, node: Arc<OverlayInode>) {
924        self.childrens.lock().await.insert(name.to_string(), node);
925    }
926
927    /// Handles operations on the upper layer inode of an `OverlayInode` in a thread-safe manner.
928    ///
929    /// This function locks the `real_inodes` field of the `OverlayInode` and retrieves the first
930    /// real inode (if any). If the first inode exists and belongs to the upper layer (`in_upper_layer` is true),
931    /// the provided callback `f` is invoked with the inode wrapped in `Some`. Otherwise, `f` is invoked with `None`.
932    ///
933    /// # Arguments
934    /// * `f`: A closure that takes an `Option<RealInode>` and returns a future. The future resolves to a `Result<bool>`.
935    ///
936    /// # Returns
937    /// * `Ok(bool)`: The result of invoking the callback `f`.
938    /// * `Err(Erron)`: An error is returned if:
939    ///   - There are no backend inodes (`real_inodes` is empty), indicating a dangling `OverlayInode`.
940    ///   - The callback `f` itself returns an error.
941    ///
942    /// # Behavior
943    /// 1. Locks the `real_inodes` field to ensure thread safety.
944    /// 2. Checks if the first inode exists:
945    ///    - If it exists and is in the upper layer, invokes `f(Some(inode))`.
946    ///    - If it exists but is not in the upper layer, invokes `f(None)`.
947    /// 3. If no inodes exist, returns an error indicating a dangling `OverlayInode`.
948    ///
949    /// # Example Use Case
950    /// This function is typically used to perform operations on the upper layer inode of an `OverlayInode`,
951    /// such as creating, modifying, or deleting files/directories in the overlay filesystem's upper layer.
952    pub async fn handle_upper_inode_locked<F, Fut>(&self, f: F) -> Result<bool>
953    where
954        // Can pass a &RealInode (or None) to f for any lifetime 'a
955        F: FnOnce(Option<Arc<RealInode>>) -> Fut,
956        // f returns a Future that must live at least as long as 'a
957        Fut: Future<Output = Result<bool>>,
958    {
959        let all_inodes = self.real_inodes.lock().await;
960        let first = all_inodes.first();
961        match first {
962            Some(v) => {
963                if v.in_upper_layer {
964                    f(Some(v.clone())).await
965                } else {
966                    f(None).await
967                }
968            }
969            None => Err(Error::other(format!(
970                "BUG: dangling OverlayInode {} without any backend inode",
971                self.inode
972            ))),
973        }
974    }
975}
976#[allow(unused)]
977fn entry_type_from_mode(mode: libc::mode_t) -> u8 {
978    match mode & libc::S_IFMT {
979        libc::S_IFBLK => libc::DT_BLK,
980        libc::S_IFCHR => libc::DT_CHR,
981        libc::S_IFDIR => libc::DT_DIR,
982        libc::S_IFIFO => libc::DT_FIFO,
983        libc::S_IFLNK => libc::DT_LNK,
984        libc::S_IFREG => libc::DT_REG,
985        libc::S_IFSOCK => libc::DT_SOCK,
986        _ => libc::DT_UNKNOWN,
987    }
988}
989impl OverlayFs {
990    pub fn new(
991        upper: Option<Arc<BoxedLayer>>,
992        lowers: Vec<Arc<BoxedLayer>>,
993        params: Config,
994        root_inode: u64,
995    ) -> Result<Self> {
996        // load root inode
997        Ok(OverlayFs {
998            config: params,
999            lower_layers: lowers,
1000            upper_layer: upper,
1001            inodes: RwLock::new(InodeStore::new()),
1002            handles: Mutex::new(HashMap::new()),
1003            next_handle: AtomicU64::new(1),
1004            writeback: AtomicBool::new(false),
1005            no_open: AtomicBool::new(false),
1006            no_opendir: AtomicBool::new(false),
1007            killpriv_v2: AtomicBool::new(false),
1008            perfile_dax: AtomicBool::new(false),
1009            root_inodes: root_inode,
1010        })
1011    }
1012
1013    pub fn root_inode(&self) -> Inode {
1014        self.root_inodes
1015    }
1016
1017    async fn alloc_inode(&self, path: &str) -> Result<u64> {
1018        self.inodes.write().await.alloc_inode(path)
1019    }
1020
1021    /// Add a file layer and stack and merge the previous file layers.
1022    pub async fn push_layer(&mut self, layer: Arc<BoxedLayer>) -> Result<()> {
1023        let upper = self.upper_layer.take();
1024        if let Some(upper) = upper {
1025            self.lower_layers.push(upper);
1026        }
1027        self.upper_layer = Some(layer);
1028        // TODO: merge previous file layers. need optimization
1029        self.import().await?;
1030        Ok(())
1031    }
1032
1033    pub async fn import(&self) -> Result<()> {
1034        let mut root = OverlayInode::new();
1035        root.inode = self.root_inode();
1036        root.path = String::from("").into();
1037        root.name = String::from("").into();
1038        root.lookups = AtomicU64::new(2);
1039        root.real_inodes = Mutex::new(vec![]);
1040        let ctx = Request::default();
1041
1042        // Update upper inode
1043        if let Some(layer) = self.upper_layer.as_ref() {
1044            let ino = layer.root_inode();
1045            let real = RealInode::new(
1046                layer.clone(),
1047                true,
1048                ino,
1049                false,
1050                layer.is_opaque(ctx, ino).await?,
1051            )
1052            .await;
1053            root.real_inodes.lock().await.push(real.into());
1054        }
1055
1056        // Update lower inodes.
1057        for layer in self.lower_layers.iter() {
1058            let ino = layer.root_inode();
1059            let real: RealInode = RealInode::new(
1060                layer.clone(),
1061                false,
1062                ino,
1063                false,
1064                layer.is_opaque(ctx, ino).await?,
1065            )
1066            .await;
1067            root.real_inodes.lock().await.push(real.into());
1068        }
1069        let root_node = Arc::new(root);
1070
1071        // insert root inode into hash
1072        self.insert_inode(self.root_inode(), Arc::clone(&root_node))
1073            .await;
1074
1075        info!("loading root directory");
1076        self.load_directory(ctx, &root_node).await?;
1077        info!("loaded root directory");
1078
1079        Ok(())
1080    }
1081
1082    async fn root_node(&self) -> Arc<OverlayInode> {
1083        // Root node must exist.
1084        self.get_active_inode(self.root_inode()).await.unwrap()
1085    }
1086
1087    async fn insert_inode(&self, inode: u64, node: Arc<OverlayInode>) {
1088        self.inodes.write().await.insert_inode(inode, node).await;
1089    }
1090
1091    async fn get_active_inode(&self, inode: u64) -> Option<Arc<OverlayInode>> {
1092        self.inodes.read().await.get_inode(inode)
1093    }
1094
1095    // Get inode which is active or deleted.
1096    async fn get_all_inode(&self, inode: u64) -> Option<Arc<OverlayInode>> {
1097        let inode_store = self.inodes.read().await;
1098        match inode_store.get_inode(inode) {
1099            Some(n) => Some(n),
1100            None => inode_store.get_deleted_inode(inode),
1101        }
1102    }
1103
1104    // Return the inode only if it's permanently deleted from both self.inodes and self.deleted_inodes.
1105    async fn remove_inode(
1106        &self,
1107        inode: u64,
1108        path_removed: Option<String>,
1109    ) -> Option<Arc<OverlayInode>> {
1110        self.inodes
1111            .write()
1112            .await
1113            .remove_inode(inode, path_removed)
1114            .await
1115    }
1116
1117    // Lookup child OverlayInode with <name> under <parent> directory.
1118    // If name is empty, return parent itself.
1119    // Parent dir will be loaded, but returned OverlayInode won't.
1120    async fn lookup_node(
1121        &self,
1122        ctx: Request,
1123        parent: Inode,
1124        name: &str,
1125    ) -> Result<Arc<OverlayInode>> {
1126        if name.contains(SLASH_ASCII) {
1127            return Err(Error::from_raw_os_error(libc::EINVAL));
1128        }
1129
1130        // Parent inode is expected to be loaded before this function is called.
1131        // TODO: Is this correct?
1132        let pnode = match self.get_active_inode(parent).await {
1133            Some(v) => v,
1134            None => {
1135                match self.get_all_inode(parent).await {
1136                    Some(v) => {
1137                        trace!(
1138                            "overlayfs:mod.rs:1031:lookup_node: parent inode {parent} is deleted"
1139                        );
1140                        v
1141                    }
1142                    None => {
1143                        trace!(
1144                            "overlayfs:mod.rs:1034:lookup_node: parent inode {parent} not found"
1145                        );
1146                        // Parent inode is not found, return ENOENT.
1147                        return Err(Error::from_raw_os_error(libc::ENOENT));
1148                    }
1149                }
1150            }
1151        };
1152
1153        // Parent is whiteout-ed, return ENOENT.
1154        if pnode.whiteout.load(Ordering::Relaxed) {
1155            return Err(Error::from_raw_os_error(libc::ENOENT));
1156        }
1157
1158        let st = pnode.stat64(ctx).await?;
1159        if utils::is_dir(&st.attr.kind) && !pnode.loaded.load(Ordering::Relaxed) {
1160            // Parent is expected to be directory, load it first.
1161            self.load_directory(ctx, &pnode).await?;
1162        }
1163
1164        // Current file or dir.
1165        if name.eq(".")  
1166            // Root directory has no parent.
1167            || (parent == self.root_inode() && name.eq("..")) 
1168            // Special convention: empty name indicates current dir.
1169            || name.is_empty()
1170        {
1171            return Ok(Arc::clone(&pnode));
1172        }
1173
1174        match pnode.child(name).await {
1175            // Child is found.
1176            Some(v) => Ok(v),
1177            None => {
1178                trace!("lookup_node: child {name} not found");
1179                Err(Error::from_raw_os_error(libc::ENOENT))
1180            }
1181        }
1182    }
1183
1184    async fn lookup_node_ignore_enoent(
1185        &self,
1186        ctx: Request,
1187        parent: u64,
1188        name: &str,
1189    ) -> Result<Option<Arc<OverlayInode>>> {
1190        match self.lookup_node(ctx, parent, name).await {
1191            Ok(n) => Ok(Some(Arc::clone(&n))),
1192            Err(e) => {
1193                if let Some(raw_error) = e.raw_os_error()
1194                    && raw_error == libc::ENOENT
1195                {
1196                    return Ok(None);
1197                }
1198                Err(e)
1199            }
1200        }
1201    }
1202
1203    // Load entries of the directory from all layers, if node is not directory, return directly.
1204    async fn load_directory(&self, ctx: Request, node: &Arc<OverlayInode>) -> Result<()> {
1205        if node.loaded.load(Ordering::Relaxed) {
1206            return Ok(());
1207        }
1208
1209        // We got all childrens without inode.
1210        // info!("before scan childrens, ctx: {:?}, node: {:?}", ctx, node.inode);
1211        let childrens = node.scan_childrens(ctx).await?;
1212        // info!("scanned children");
1213
1214        // =============== Start Lock Area ===================
1215        // Lock OverlayFs inodes.
1216        let mut inode_store = self.inodes.write().await;
1217        // Lock the OverlayInode and its childrens.
1218        let mut node_children = node.childrens.lock().await;
1219
1220        // Check again in case another 'load_directory' function call gets locks and want to do duplicated work.
1221        if node.loaded.load(Ordering::Relaxed) {
1222            return Ok(());
1223        }
1224
1225        // Now we have two locks' protection, Fs inodes lock and OverlayInode's childrens lock.
1226        // info!("before iter childrens");
1227        for mut child in childrens.into_iter() {
1228            // Allocate inode for each child.
1229            let ino = inode_store.alloc_inode(&child.path.read().await)?;
1230
1231            let name = child.name.read().await.clone();
1232            child.inode = ino;
1233            // Create bi-directional link between parent and child.
1234            child.parent = Mutex::new(Arc::downgrade(node));
1235
1236            let arc_child = Arc::new(child);
1237            node_children.insert(name, arc_child.clone());
1238            // Record overlay inode in whole OverlayFs.
1239            inode_store.insert_inode(ino, arc_child).await;
1240        }
1241        // info!("after iter childrens");
1242
1243        node.loaded.store(true, Ordering::Relaxed);
1244
1245        Ok(())
1246    }
1247
1248    async fn forget_one(&self, inode: Inode, count: u64) {
1249        if inode == self.root_inode() || inode == 0 {
1250            return;
1251        }
1252
1253        let v = match self.get_all_inode(inode).await {
1254            Some(n) => n,
1255            None => {
1256                trace!("forget unknown inode: {inode}");
1257                return;
1258            }
1259        };
1260
1261        // Use fetch_update to atomically update lookups in a loop until it succeeds
1262        v.lookups
1263            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
1264                // If count is larger than current lookups, return 0
1265                // Otherwise subtract count from current lookups
1266                if current < count {
1267                    Some(0)
1268                } else {
1269                    Some(current - count)
1270                }
1271            })
1272            .expect("fetch_update failed");
1273
1274        let lookups = v.lookups.load(Ordering::Relaxed);
1275        trace!(
1276            "forget inode: {}, name {}, lookups: {}",
1277            inode,
1278            v.name.read().await,
1279            lookups
1280        );
1281        if lookups == 0 {
1282            debug!(
1283                "inode is forgotten: {}, name {}",
1284                inode,
1285                v.name.read().await
1286            );
1287            let _ = self.remove_inode(inode, None).await;
1288            let parent = v.parent.lock().await;
1289
1290            if let Some(p) = parent.upgrade() {
1291                // remove it from hashmap
1292                p.remove_child(&v.name.read().await).await;
1293            }
1294        }
1295    }
1296
1297    async fn do_lookup(&self, ctx: Request, parent: Inode, name: &str) -> Result<ReplyEntry> {
1298        let node = self.lookup_node(ctx, parent, name).await?;
1299        debug!("do_lookup: {name:?}, found");
1300
1301        if node.whiteout.load(Ordering::Relaxed) {
1302            eprintln!("Error: node.whiteout.load() called.");
1303            return Err(Error::from_raw_os_error(libc::ENOENT));
1304        }
1305
1306        let mut st = node.stat64(ctx).await?;
1307        st.attr.ino = node.inode;
1308        if utils::is_dir(&st.attr.kind) && !node.loaded.load(Ordering::Relaxed) {
1309            self.load_directory(ctx, &node).await?;
1310        }
1311
1312        // FIXME: can forget happen between found and increase reference counter?
1313        let tmp = node.lookups.fetch_add(1, Ordering::Relaxed);
1314        trace!("lookup count: {}", tmp + 1);
1315        Ok(ReplyEntry {
1316            ttl: st.ttl,
1317            attr: st.attr,
1318            generation: 0,
1319        })
1320    }
1321
1322    async fn do_statvfs(&self, ctx: Request, inode: Inode) -> Result<ReplyStatFs> {
1323        match self.get_active_inode(inode).await {
1324            Some(ovi) => {
1325                let all_inodes = ovi.real_inodes.lock().await;
1326                let real_inode = all_inodes
1327                    .first()
1328                    .ok_or(Error::other("backend inode not found"))?;
1329                Ok(real_inode.layer.statfs(ctx, real_inode.inode).await?)
1330            }
1331            None => Err(Error::from_raw_os_error(libc::ENOENT)),
1332        }
1333    }
1334
1335    #[allow(clippy::too_many_arguments)]
1336    async fn do_readdir<'a>(
1337        &self,
1338        ctx: Request,
1339        inode: Inode,
1340        handle: u64,
1341        offset: u64,
1342    ) -> Result<
1343        impl futures_util::stream::Stream<Item = std::result::Result<DirectoryEntry, Errno>> + Send + 'a,
1344    > {
1345        let snapshot = self.get_or_create_dir_snapshot(ctx, inode, handle).await?;
1346
1347        let entries: Vec<std::result::Result<DirectoryEntry, Errno>> =
1348            if offset < snapshot.len() as u64 {
1349                snapshot
1350                    .iter()
1351                    .skip(offset as usize)
1352                    .map(|entry| {
1353                        Ok(DirectoryEntry {
1354                            inode: entry.inode,
1355                            kind: entry.kind,
1356                            name: entry.name.clone(),
1357                            offset: entry.offset,
1358                        })
1359                    })
1360                    .collect()
1361            } else {
1362                vec![]
1363            };
1364
1365        Ok(iter(entries))
1366    }
1367
1368    #[allow(clippy::too_many_arguments)]
1369    async fn do_readdirplus<'a>(
1370        &self,
1371        ctx: Request,
1372        inode: Inode,
1373        handle: u64,
1374        offset: u64,
1375    ) -> Result<
1376        impl futures_util::stream::Stream<Item = std::result::Result<DirectoryEntryPlus, Errno>>
1377        + Send
1378        + 'a,
1379    > {
1380        let snapshot = self.get_or_create_dir_snapshot(ctx, inode, handle).await?;
1381
1382        let mut entries = Vec::new();
1383        if offset < snapshot.len() as u64 {
1384            for entry in snapshot.iter().skip(offset as usize) {
1385                // Increment lookup count for readdirplus as we are handing out a reference to the kernel.
1386                // We must do this here, not in snapshot creation, and we must NOT decrement it in HandleData drop.
1387                // The kernel will send a FORGET request when it's done with the entry.
1388                if let Some(node) = self.get_all_inode(entry.inode).await {
1389                    node.lookups.fetch_add(1, Ordering::Relaxed);
1390                }
1391                entries.push(Ok(entry.clone()));
1392            }
1393        }
1394
1395        Ok(iter(entries))
1396    }
1397
1398    async fn get_or_create_dir_snapshot(
1399        &self,
1400        ctx: Request,
1401        inode: Inode,
1402        handle: u64,
1403    ) -> Result<Vec<DirectoryEntryPlus>> {
1404        let handle_data = match self.handles.lock().await.get(&handle) {
1405            Some(hd) if hd.node.inode == inode => hd.clone(),
1406            _ => {
1407                // Fallback for cases without a valid handle (e.g. no-opendir)
1408                let node = self.lookup_node(ctx, inode, ".").await?;
1409                let st = node.stat64(ctx).await?;
1410                if !utils::is_dir(&st.attr.kind) {
1411                    return Err(Error::from_raw_os_error(libc::ENOTDIR));
1412                }
1413                // Create a temporary HandleData for this call only.
1414                Arc::new(HandleData {
1415                    node,
1416                    real_handle: None,
1417                    dir_snapshot: Mutex::new(None),
1418                })
1419            }
1420        };
1421
1422        // Optimistic check
1423        if let Some(snapshot) = handle_data.dir_snapshot.lock().await.as_ref() {
1424            return Ok(snapshot.clone());
1425        }
1426
1427        // Snapshot doesn't exist, create it.
1428        let ovl_inode = &handle_data.node;
1429        self.load_directory(ctx, ovl_inode).await?;
1430
1431        let mut entries = Vec::new();
1432
1433        // 1. Add "." entry
1434        let mut st_self = ovl_inode.stat64(ctx).await?;
1435        st_self.attr.ino = ovl_inode.inode;
1436        entries.push(DirectoryEntryPlus {
1437            inode: ovl_inode.inode,
1438            generation: 0,
1439            kind: st_self.attr.kind,
1440            name: ".".into(),
1441            offset: 1,
1442            attr: st_self.attr,
1443            entry_ttl: st_self.ttl,
1444            attr_ttl: st_self.ttl,
1445        });
1446
1447        // 2. Add ".." entry
1448        let parent_node = match ovl_inode.parent.lock().await.upgrade() {
1449            Some(node) => node,
1450            None => self.root_node().await,
1451        };
1452        let mut st_parent = parent_node.stat64(ctx).await?;
1453        st_parent.attr.ino = parent_node.inode;
1454        entries.push(DirectoryEntryPlus {
1455            inode: parent_node.inode,
1456            generation: 0,
1457            kind: st_parent.attr.kind,
1458            name: "..".into(),
1459            offset: 2,
1460            attr: st_parent.attr,
1461            entry_ttl: st_parent.ttl,
1462            attr_ttl: st_parent.ttl,
1463        });
1464
1465        // 3. Add children entries
1466        let children = ovl_inode.childrens.lock().await;
1467        for (name, child) in children.iter() {
1468            if child.whiteout.load(Ordering::Relaxed) {
1469                continue;
1470            }
1471            let mut st_child = child.stat64(ctx).await?;
1472            st_child.attr.ino = child.inode;
1473            entries.push(DirectoryEntryPlus {
1474                inode: child.inode,
1475                generation: 0,
1476                kind: st_child.attr.kind,
1477                name: name.clone().into(),
1478                offset: (entries.len() + 1) as i64,
1479                attr: st_child.attr,
1480                entry_ttl: st_child.ttl,
1481                attr_ttl: st_child.ttl,
1482            });
1483        }
1484        drop(children);
1485
1486        let mut snapshot_guard = handle_data.dir_snapshot.lock().await;
1487        if snapshot_guard.is_none() {
1488            // We won the race, install our prepared snapshot.
1489            *snapshot_guard = Some(entries.clone());
1490            Ok(entries)
1491        } else {
1492            // Another thread won the race while we were preparing.
1493            // Discard our work and use the existing snapshot.
1494            Ok(snapshot_guard.as_ref().unwrap().clone())
1495        }
1496    }
1497
1498    async fn do_mkdir(
1499        &self,
1500        ctx: Request,
1501        parent_node: Arc<OverlayInode>,
1502        name: &str,
1503        mode: u32,
1504        umask: u32,
1505    ) -> Result<()> {
1506        if self.upper_layer.is_none() {
1507            return Err(Error::from_raw_os_error(libc::EROFS));
1508        }
1509
1510        // Parent node was deleted.
1511        if parent_node.whiteout.load(Ordering::Relaxed) {
1512            return Err(Error::from_raw_os_error(libc::ENOENT));
1513        }
1514
1515        let mut delete_whiteout = false;
1516        let mut set_opaque = false;
1517        if let Some(n) = self
1518            .lookup_node_ignore_enoent(ctx, parent_node.inode, name)
1519            .await?
1520        {
1521            // Node with same name exists, let's check if it's whiteout.
1522            if !n.whiteout.load(Ordering::Relaxed) {
1523                return Err(Error::from_raw_os_error(libc::EEXIST));
1524            }
1525
1526            if n.in_upper_layer().await {
1527                delete_whiteout = true;
1528            }
1529
1530            // Set opaque if child dir has lower layers.
1531            if !n.upper_layer_only().await {
1532                set_opaque = true;
1533            }
1534        }
1535
1536        // Copy parent node up if necessary.
1537        let pnode = self.copy_node_up(ctx, parent_node).await?;
1538
1539        let path = format!("{}/{}", pnode.path.read().await, name);
1540        let path_ref = &path;
1541        let new_node = Arc::new(Mutex::new(None));
1542        pnode
1543            .handle_upper_inode_locked(&mut |parent_real_inode: Option<Arc<RealInode>>| async {
1544                let parent_real_inode = match parent_real_inode {
1545                    Some(inode) => inode,
1546                    None => {
1547                        error!("BUG: parent doesn't have upper inode after copied up");
1548                        return Err(Error::from_raw_os_error(libc::EINVAL));
1549                    }
1550                };
1551                let osstr = OsStr::new(name);
1552                if delete_whiteout {
1553                    let _ = parent_real_inode
1554                        .layer
1555                        .delete_whiteout(ctx, parent_real_inode.inode, osstr)
1556                        .await;
1557                }
1558
1559                // Allocate inode number.
1560                let ino = self.alloc_inode(path_ref).await?;
1561                let child_dir = parent_real_inode.mkdir(ctx, name, mode, umask).await?;
1562                // Set opaque if child dir has lower layers.
1563                if set_opaque {
1564                    parent_real_inode
1565                        .layer
1566                        .set_opaque(ctx, child_dir.inode)
1567                        .await?;
1568                }
1569                let ovi =
1570                    OverlayInode::new_from_real_inode(name, ino, path_ref.clone(), child_dir).await;
1571                new_node.lock().await.replace(ovi);
1572                Ok(false)
1573            })
1574            .await?;
1575
1576        // new_node is always 'Some'
1577        let nn = new_node.lock().await.take();
1578        let arc_node = Arc::new(nn.unwrap());
1579        self.insert_inode(arc_node.inode, arc_node.clone()).await;
1580        pnode.insert_child(name, arc_node).await;
1581        Ok(())
1582    }
1583
1584    async fn do_mknod(
1585        &self,
1586        ctx: Request,
1587        parent_node: &Arc<OverlayInode>,
1588        name: &str,
1589        mode: u32,
1590        rdev: u32,
1591        umask: u32,
1592    ) -> Result<()> {
1593        if self.upper_layer.is_none() {
1594            return Err(Error::from_raw_os_error(libc::EROFS));
1595        }
1596
1597        // Parent node was deleted.
1598        if parent_node.whiteout.load(Ordering::Relaxed) {
1599            return Err(Error::from_raw_os_error(libc::ENOENT));
1600        }
1601
1602        match self
1603            .lookup_node_ignore_enoent(ctx, parent_node.inode, name)
1604            .await?
1605        {
1606            Some(n) => {
1607                // Node with same name exists, let's check if it's whiteout.
1608                if !n.whiteout.load(Ordering::Relaxed) {
1609                    return Err(Error::from_raw_os_error(libc::EEXIST));
1610                }
1611
1612                // Copy parent node up if necessary.
1613                let pnode = self.copy_node_up(ctx, Arc::clone(parent_node)).await?;
1614                pnode
1615                    .handle_upper_inode_locked(
1616                        &mut |parent_real_inode: Option<Arc<RealInode>>| async {
1617                            let parent_real_inode = match parent_real_inode {
1618                                Some(inode) => inode,
1619                                None => {
1620                                    error!("BUG: parent doesn't have upper inode after copied up");
1621                                    return Err(Error::from_raw_os_error(libc::EINVAL));
1622                                }
1623                            };
1624                            let osstr = OsStr::new(name);
1625                            if n.in_upper_layer().await {
1626                                let _ = parent_real_inode
1627                                    .layer
1628                                    .delete_whiteout(ctx, parent_real_inode.inode, osstr)
1629                                    .await;
1630                            }
1631
1632                            let child_ri = parent_real_inode
1633                                .mknod(ctx, name, mode, rdev, umask)
1634                                .await?;
1635
1636                            // Replace existing real inodes with new one.
1637                            n.add_upper_inode(child_ri, true).await;
1638                            Ok(false)
1639                        },
1640                    )
1641                    .await?;
1642            }
1643            None => {
1644                // Copy parent node up if necessary.
1645                let pnode = self.copy_node_up(ctx, Arc::clone(parent_node)).await?;
1646                let new_node = Arc::new(Mutex::new(None));
1647                let path = format!("{}/{}", pnode.path.read().await, name);
1648                pnode
1649                    .handle_upper_inode_locked(
1650                        &mut |parent_real_inode: Option<Arc<RealInode>>| async {
1651                            let parent_real_inode = match parent_real_inode {
1652                                Some(inode) => inode,
1653                                None => {
1654                                    error!("BUG: parent doesn't have upper inode after copied up");
1655                                    return Err(Error::from_raw_os_error(libc::EINVAL));
1656                                }
1657                            };
1658
1659                            // Allocate inode number.
1660                            let ino = self.alloc_inode(&path).await?;
1661                            let child_ri = parent_real_inode
1662                                .mknod(ctx, name, mode, rdev, umask)
1663                                .await?;
1664                            let ovi = OverlayInode::new_from_real_inode(
1665                                name,
1666                                ino,
1667                                path.clone(),
1668                                child_ri,
1669                            )
1670                            .await;
1671
1672                            new_node.lock().await.replace(ovi);
1673                            Ok(false)
1674                        },
1675                    )
1676                    .await?;
1677
1678                let nn = new_node.lock().await.take();
1679                let arc_node = Arc::new(nn.unwrap());
1680                self.insert_inode(arc_node.inode, arc_node.clone()).await;
1681                pnode.insert_child(name, arc_node).await;
1682            }
1683        }
1684
1685        Ok(())
1686    }
1687
1688    async fn do_create(
1689        &self,
1690        ctx: Request,
1691        parent_node: &Arc<OverlayInode>,
1692        name: &OsStr,
1693        mode: u32,
1694        flags: u32,
1695    ) -> Result<Option<u64>> {
1696        let name_str = name.to_str().unwrap();
1697        let upper = self
1698            .upper_layer
1699            .as_ref()
1700            .cloned()
1701            .ok_or_else(|| Error::from_raw_os_error(libc::EROFS))?;
1702
1703        // Parent node was deleted.
1704        if parent_node.whiteout.load(Ordering::Relaxed) {
1705            return Err(Error::from_raw_os_error(libc::ENOENT));
1706        }
1707
1708        let handle: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
1709        let real_ino: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
1710        let new_ovi = match self
1711            .lookup_node_ignore_enoent(ctx, parent_node.inode, name_str)
1712            .await?
1713        {
1714            Some(n) => {
1715                // Node with same name exists, let's check if it's whiteout.
1716                if !n.whiteout.load(Ordering::Relaxed) {
1717                    return Err(Error::from_raw_os_error(libc::EEXIST));
1718                }
1719
1720                // Copy parent node up if necessary.
1721                let pnode = self.copy_node_up(ctx, Arc::clone(parent_node)).await?;
1722                pnode
1723                    .handle_upper_inode_locked(
1724                        &mut |parent_real_inode: Option<Arc<RealInode>>| async {
1725                            let parent_real_inode = match parent_real_inode {
1726                                Some(inode) => inode,
1727                                None => {
1728                                    error!("BUG: parent doesn't have upper inode after copied up");
1729                                    return Err(Error::from_raw_os_error(libc::EINVAL));
1730                                }
1731                            };
1732
1733                            if n.in_upper_layer().await {
1734                                let _ = parent_real_inode
1735                                    .layer
1736                                    .delete_whiteout(ctx, parent_real_inode.inode, name)
1737                                    .await;
1738                            }
1739
1740                            let (child_ri, hd) =
1741                                parent_real_inode.create(ctx, name_str, mode, flags).await?;
1742                            real_ino.lock().await.replace(child_ri.inode);
1743                            handle.lock().await.replace(hd.unwrap());
1744
1745                            // Replace existing real inodes with new one.
1746                            n.add_upper_inode(child_ri, true).await;
1747                            Ok(false)
1748                        },
1749                    )
1750                    .await?;
1751                n.clone()
1752            }
1753            None => {
1754                // Copy parent node up if necessary.
1755                let pnode = self.copy_node_up(ctx, Arc::clone(parent_node)).await?;
1756                let new_node = Arc::new(Mutex::new(None));
1757                let path = format!("{}/{}", pnode.path.read().await, name_str);
1758                pnode
1759                    .handle_upper_inode_locked(
1760                        &mut |parent_real_inode: Option<Arc<RealInode>>| async {
1761                            let parent_real_inode = match parent_real_inode {
1762                                Some(inode) => inode,
1763                                None => {
1764                                    error!("BUG: parent doesn't have upper inode after copied up");
1765                                    return Err(Error::from_raw_os_error(libc::EINVAL));
1766                                }
1767                            };
1768
1769                            let (child_ri, hd) =
1770                                parent_real_inode.create(ctx, name_str, mode, flags).await?;
1771                            real_ino.lock().await.replace(child_ri.inode);
1772                            handle.lock().await.replace(hd.unwrap());
1773                            // Allocate inode number.
1774                            let ino = self.alloc_inode(&path).await?;
1775                            let ovi = OverlayInode::new_from_real_inode(
1776                                name_str,
1777                                ino,
1778                                path.clone(),
1779                                child_ri,
1780                            )
1781                            .await;
1782
1783                            new_node.lock().await.replace(ovi);
1784                            Ok(false)
1785                        },
1786                    )
1787                    .await?;
1788
1789                // new_node is always 'Some'
1790                let nn = new_node.lock().await.take();
1791                let arc_node = Arc::new(nn.unwrap());
1792                self.insert_inode(arc_node.inode, arc_node.clone()).await;
1793                pnode.insert_child(name_str, arc_node.clone()).await;
1794                arc_node
1795            }
1796        };
1797
1798        let final_handle = match *handle.lock().await {
1799            Some(hd) => {
1800                if self.no_open.load(Ordering::Relaxed) {
1801                    None
1802                } else {
1803                    let handle = self.next_handle.fetch_add(1, Ordering::Relaxed);
1804                    let handle_data = HandleData {
1805                        node: new_ovi,
1806                        real_handle: Some(RealHandle {
1807                            layer: upper.clone(),
1808                            in_upper_layer: true,
1809                            inode: real_ino.lock().await.unwrap(),
1810                            handle: AtomicU64::new(hd),
1811                        }),
1812                        dir_snapshot: Mutex::new(None),
1813                    };
1814                    self.handles
1815                        .lock()
1816                        .await
1817                        .insert(handle, Arc::new(handle_data));
1818                    Some(handle)
1819                }
1820            }
1821            None => None,
1822        };
1823        Ok(final_handle)
1824    }
1825
1826    async fn do_rename(
1827        &self,
1828        req: Request,
1829        parent: Inode,
1830        name: &OsStr,
1831        new_parent: Inode,
1832        new_name: &OsStr,
1833    ) -> Result<()> {
1834        let name_str = name.to_str().unwrap();
1835        let new_name_str = new_name.to_str().unwrap();
1836
1837        let parent_node = self.lookup_node(req, parent, "").await?;
1838        let new_parent_node = self.lookup_node(req, new_parent, "").await?;
1839        let src_node = self.lookup_node(req, parent, name_str).await?;
1840        let dest_node_opt = self
1841            .lookup_node_ignore_enoent(req, new_parent, new_name_str)
1842            .await?;
1843        // trace!("parent_node: {}, new_parent_node: {}, src_node: {}, dest_node_opt: {:?}", parent_node.inode, new_parent_node.inode, src_node.inode, dest_node_opt.as_ref().map(|n| n.inode));
1844
1845        if let Some(dest_node) = &dest_node_opt {
1846            let src_is_dir = src_node.is_dir(req).await?;
1847            let dest_is_dir = dest_node.is_dir(req).await?;
1848            if src_is_dir != dest_is_dir {
1849                return Err(Error::from_raw_os_error(libc::EISDIR));
1850            }
1851            if dest_is_dir {
1852                self.copy_directory_up(req, dest_node.clone()).await?;
1853                let (count, _) = dest_node.count_entries_and_whiteout(req).await?;
1854                if count > 0 {
1855                    return Err(Error::from_raw_os_error(libc::ENOTEMPTY));
1856                }
1857            }
1858        }
1859
1860        let pnode = self.copy_node_up(req, parent_node).await?;
1861        let new_pnode = self.copy_node_up(req, new_parent_node).await?;
1862        let s_node = self.copy_node_up(req, src_node).await?;
1863
1864        let need_whiteout = !s_node.upper_layer_only().await;
1865
1866        let (p_layer, _, p_inode) = pnode.first_layer_inode().await;
1867        let (new_p_layer, _, new_p_inode) = new_pnode.first_layer_inode().await;
1868        assert!(Arc::ptr_eq(&p_layer, &new_p_layer));
1869
1870        p_layer
1871            .rename(req, p_inode, name, new_p_inode, new_name)
1872            .await?;
1873
1874        // Handle the replaced destination node (if any).
1875        if let Some(dest_node) = dest_node_opt {
1876            let path = dest_node.path.read().await.clone();
1877            self.remove_inode(dest_node.inode, Some(path)).await;
1878        }
1879
1880        // Update the moved source node's state.
1881
1882        // Remove from old parent.
1883        pnode.remove_child(name_str).await;
1884        self.remove_inode(s_node.inode, s_node.path.read().await.clone().into())
1885            .await;
1886        let new_path = format!("{}/{}", new_pnode.path.read().await, new_name_str);
1887        *s_node.path.write().await = new_path;
1888        *s_node.name.write().await = new_name_str.to_string();
1889        *s_node.parent.lock().await = Arc::downgrade(&new_pnode);
1890        new_pnode.insert_child(new_name_str, s_node.clone()).await;
1891        self.insert_inode(s_node.inode, s_node).await;
1892
1893        // Create whiteout at the old location if necessary.
1894        if need_whiteout {
1895            p_layer.create_whiteout(req, p_inode, name).await?;
1896        }
1897
1898        Ok(())
1899    }
1900
1901    async fn do_link(
1902        &self,
1903        ctx: Request,
1904        src_node: &Arc<OverlayInode>,
1905        new_parent: &Arc<OverlayInode>,
1906        name: &str,
1907    ) -> Result<()> {
1908        if self.upper_layer.is_none() {
1909            return Err(Error::from_raw_os_error(libc::EROFS));
1910        }
1911
1912        // Node is whiteout.
1913        if src_node.whiteout.load(Ordering::Relaxed) || new_parent.whiteout.load(Ordering::Relaxed)
1914        {
1915            return Err(Error::from_raw_os_error(libc::ENOENT));
1916        }
1917
1918        let st = src_node.stat64(ctx).await?;
1919        if utils::is_dir(&st.attr.kind) {
1920            // Directory can't be hardlinked.
1921            return Err(Error::from_raw_os_error(libc::EPERM));
1922        }
1923
1924        let src_node = self.copy_node_up(ctx, Arc::clone(src_node)).await?;
1925        let new_parent = self.copy_node_up(ctx, Arc::clone(new_parent)).await?;
1926        let src_ino = src_node.first_layer_inode().await.2;
1927
1928        if let Some(existing_node) = self
1929            .lookup_node_ignore_enoent(ctx, new_parent.inode, name)
1930            .await?
1931        {
1932            // If it's not a whiteout, it's an error
1933            if !existing_node.whiteout.load(Ordering::Relaxed) {
1934                return Err(Error::from_raw_os_error(libc::EEXIST));
1935            }
1936            // If it is a whiteout, we will overwrite it.
1937            // First, remove the physical whiteout file in the upper layer.
1938            new_parent
1939                .handle_upper_inode_locked(&mut |parent_real_inode: Option<Arc<RealInode>>| async {
1940                    let parent_ri = parent_real_inode.ok_or_else(|| {
1941                        error!("BUG: parent doesn't have upper inode after copied up");
1942                        Error::from_raw_os_error(libc::EINVAL)
1943                    })?;
1944                    // Only delete if the whiteout is in the upper layer
1945                    if existing_node.in_upper_layer().await {
1946                        let _ = parent_ri
1947                            .layer
1948                            .delete_whiteout(ctx, parent_ri.inode, OsStr::new(name))
1949                            .await;
1950                    }
1951                    Ok(false)
1952                })
1953                .await?;
1954        }
1955
1956        new_parent
1957            .handle_upper_inode_locked(&mut |parent_real_inode: Option<Arc<RealInode>>| async {
1958                let parent_real_inode = match parent_real_inode {
1959                    Some(inode) => inode,
1960                    None => {
1961                        error!("BUG: parent doesn't have upper inode after copied up");
1962                        return Err(Error::from_raw_os_error(libc::EINVAL));
1963                    }
1964                };
1965
1966                parent_real_inode.link(ctx, src_ino, name).await?;
1967
1968                Ok(false)
1969            })
1970            .await?;
1971
1972        self.insert_inode(src_node.inode, src_node.clone()).await;
1973        new_parent.insert_child(name, src_node).await;
1974
1975        Ok(())
1976    }
1977
1978    async fn do_symlink(
1979        &self,
1980        ctx: Request,
1981        linkname: &str,
1982        parent_node: &Arc<OverlayInode>,
1983        name: &str,
1984    ) -> Result<()> {
1985        let name_os = OsStr::new(name);
1986        if self.upper_layer.is_none() {
1987            return Err(Error::from_raw_os_error(libc::EROFS));
1988        }
1989
1990        // parent was deleted.
1991        if parent_node.whiteout.load(Ordering::Relaxed) {
1992            return Err(Error::from_raw_os_error(libc::ENOENT));
1993        }
1994
1995        match self
1996            .lookup_node_ignore_enoent(ctx, parent_node.inode, name)
1997            .await?
1998        {
1999            Some(n) => {
2000                // Node with same name exists, let's check if it's whiteout.
2001                if !n.whiteout.load(Ordering::Relaxed) {
2002                    return Err(Error::from_raw_os_error(libc::EEXIST));
2003                }
2004
2005                // Copy parent node up if necessary.
2006                let pnode = self.copy_node_up(ctx, Arc::clone(parent_node)).await?;
2007                pnode
2008                    .handle_upper_inode_locked(
2009                        &mut |parent_real_inode: Option<Arc<RealInode>>| async {
2010                            let parent_real_inode = match parent_real_inode {
2011                                Some(inode) => inode,
2012                                None => {
2013                                    error!("BUG: parent doesn't have upper inode after copied up");
2014                                    return Err(Error::from_raw_os_error(libc::EINVAL));
2015                                }
2016                            };
2017
2018                            if n.in_upper_layer().await {
2019                                let _ = parent_real_inode
2020                                    .layer
2021                                    .delete_whiteout(ctx, parent_real_inode.inode, name_os)
2022                                    .await;
2023                            }
2024
2025                            let child_ri = parent_real_inode.symlink(ctx, linkname, name).await?;
2026
2027                            // Replace existing real inodes with new one.
2028                            n.add_upper_inode(child_ri, true).await;
2029                            Ok(false)
2030                        },
2031                    )
2032                    .await?;
2033            }
2034            None => {
2035                // Copy parent node up if necessary.
2036                let pnode = self.copy_node_up(ctx, Arc::clone(parent_node)).await?;
2037                let new_node: Arc<Mutex<Option<OverlayInode>>> = Arc::new(Mutex::new(None));
2038                let path = format!("{}/{}", pnode.path.read().await, name);
2039                pnode
2040                    .handle_upper_inode_locked(
2041                        &mut |parent_real_inode: Option<Arc<RealInode>>| async {
2042                            let parent_real_inode = match parent_real_inode {
2043                                Some(inode) => inode,
2044                                None => {
2045                                    error!("BUG: parent doesn't have upper inode after copied up");
2046                                    return Err(Error::from_raw_os_error(libc::EINVAL));
2047                                }
2048                            };
2049
2050                            // Allocate inode number.
2051                            let ino = self.alloc_inode(&path).await?;
2052                            let child_ri = parent_real_inode.symlink(ctx, linkname, name).await?;
2053                            let ovi = OverlayInode::new_from_real_inode(
2054                                name,
2055                                ino,
2056                                path.clone(),
2057                                child_ri,
2058                            )
2059                            .await;
2060
2061                            new_node.lock().await.replace(ovi);
2062                            Ok(false)
2063                        },
2064                    )
2065                    .await?;
2066
2067                // new_node is always 'Some'
2068                let arc_node = Arc::new(new_node.lock().await.take().unwrap());
2069                self.insert_inode(arc_node.inode, arc_node.clone()).await;
2070                pnode.insert_child(name, arc_node).await;
2071            }
2072        }
2073
2074        Ok(())
2075    }
2076
2077    /// Copies a symbolic link from a lower layer to the upper layer.
2078    ///
2079    /// This function is a part of the copy-up process, triggered when a symlink that
2080    /// only exists in a lower layer is modified. It reads the link target and attributes
2081    /// from the lower layer and creates an identical symlink in the upper layer, crucially
2082    /// preserving the original host UID and GID.
2083    async fn copy_symlink_up(
2084        &self,
2085        ctx: Request,
2086        node: Arc<OverlayInode>,
2087    ) -> Result<Arc<OverlayInode>> {
2088        if node.in_upper_layer().await {
2089            return Ok(node);
2090        }
2091
2092        let parent_node = if let Some(ref n) = node.parent.lock().await.upgrade() {
2093            Arc::clone(n)
2094        } else {
2095            return Err(Error::other("no parent?"));
2096        };
2097
2098        // To preserve original ownership, we must get the raw, unmapped host attributes.
2099        // We achieve this by calling `do_getattr_helper`, which is specifically designed
2100        // to bypass the ID mapping logic. This is safe and does not affect other
2101        // functionalities because `do_getattr_helper` and the standard `stat64()` call
2102        // both rely on the same underlying `stat` system call; they only differ in
2103        // whether the resulting `uid` and `gid` are mapped.
2104        let (self_layer, _, self_inode) = node.first_layer_inode().await;
2105        let re = self_layer
2106            .getattr_with_mapping(self_inode, None, false)
2107            .await?;
2108        let st = ReplyAttr {
2109            ttl: re.1,
2110            attr: convert_stat64_to_file_attr(re.0),
2111        };
2112
2113        if !parent_node.in_upper_layer().await {
2114            parent_node.clone().create_upper_dir(ctx, None).await?;
2115        }
2116
2117        // Read the linkname from lower layer.
2118        let reply_data = self_layer.readlink(ctx, self_inode).await?;
2119        // Convert path to &str.
2120        let path = std::str::from_utf8(&reply_data.data)
2121            .map_err(|_| Error::from_raw_os_error(libc::EINVAL))?;
2122
2123        let new_upper_real: Arc<Mutex<Option<RealInode>>> = Arc::new(Mutex::new(None));
2124        parent_node
2125            .handle_upper_inode_locked(&mut |parent_upper_inode: Option<Arc<RealInode>>| async {
2126                // We already create upper dir for parent_node above.
2127                let parent_real_inode =
2128                    parent_upper_inode.ok_or_else(|| Error::from_raw_os_error(libc::EROFS))?;
2129                // We manually unfold the `symlink` logic here instead of calling the `symlink` method directly.
2130                // This is necessary to preserve the original file's UID and GID during the copy-up process.
2131                if !parent_real_inode.in_upper_layer {
2132                    return Err(Error::from_raw_os_error(libc::EROFS));
2133                }
2134                let link_name = OsStr::new(path);
2135                let filename = node.name.read().await;
2136                let filename = OsStr::new(filename.as_str());
2137                let op_ctx = crate::context::OperationContext::with_credentials(
2138                    ctx,
2139                    st.attr.uid,
2140                    st.attr.gid,
2141                );
2142                let entry = parent_real_inode
2143                    .layer
2144                    .symlink_with_context(op_ctx, parent_real_inode.inode, filename, link_name)
2145                    .await?;
2146                let ri = RealInode {
2147                    layer: parent_real_inode.layer.clone(),
2148                    in_upper_layer: true,
2149                    inode: entry.attr.ino,
2150                    whiteout: false,
2151                    opaque: false,
2152                    stat: Some(ReplyAttr {
2153                        ttl: entry.ttl,
2154                        attr: entry.attr,
2155                    }),
2156                };
2157                new_upper_real.lock().await.replace(ri);
2158                Ok(false)
2159            })
2160            .await?;
2161
2162        if let Some(real_inode) = new_upper_real.lock().await.take() {
2163            // update upper_inode and first_inode()
2164            node.add_upper_inode(real_inode, true).await;
2165        }
2166
2167        Ok(node)
2168    }
2169
2170    /// Copies a regular file and its contents from a lower layer to the upper layer.
2171    ///
2172    /// This function is a core part of the copy-up process, triggered when a regular file
2173    /// that only exists in a lower layer is written to. It creates an empty file in the
2174    /// upper layer with the original file's attributes (mode, UID, GID), and then copies
2175    /// the entire content from the lower layer file to the new upper layer file.
2176    async fn copy_regfile_up(
2177        &self,
2178        ctx: Request,
2179        node: Arc<OverlayInode>,
2180    ) -> Result<Arc<OverlayInode>> {
2181        if node.in_upper_layer().await {
2182            return Ok(node);
2183        }
2184
2185        let parent_node = if let Some(ref n) = node.parent.lock().await.upgrade() {
2186            Arc::clone(n)
2187        } else {
2188            return Err(Error::other("no parent?"));
2189        };
2190
2191        // To preserve original ownership, we must get the raw, unmapped host attributes.
2192        // We achieve this by calling `do_getattr_helper`, which is specifically designed
2193        // to bypass the ID mapping logic. This is safe and does not affect other
2194        // functionalities because `do_getattr_helper` and the standard `stat64()` call
2195        // both rely on the same underlying `stat` system call; they only differ in
2196        // whether the resulting `uid` and `gid` are mapped.
2197        let (lower_layer, _, lower_inode) = node.first_layer_inode().await;
2198        let re = lower_layer
2199            .getattr_with_mapping(lower_inode, None, false)
2200            .await?;
2201        let st = ReplyAttr {
2202            ttl: re.1,
2203            attr: convert_stat64_to_file_attr(re.0),
2204        };
2205        trace!(
2206            "copy_regfile_up: node {} in lower layer's inode {}",
2207            node.inode, lower_inode
2208        );
2209
2210        if !parent_node.in_upper_layer().await {
2211            parent_node.clone().create_upper_dir(ctx, None).await?;
2212        }
2213
2214        // create the file in upper layer using information from lower layer
2215
2216        let flags = libc::O_WRONLY;
2217        let mode = mode_from_kind_and_perm(st.attr.kind, st.attr.perm);
2218
2219        let upper_handle = Arc::new(Mutex::new(0));
2220        let upper_real_inode = Arc::new(Mutex::new(None));
2221        parent_node
2222            .handle_upper_inode_locked(&mut |parent_upper_inode: Option<Arc<RealInode>>| async {
2223                // We already create upper dir for parent_node.
2224                let parent_real_inode = parent_upper_inode.ok_or_else(|| {
2225                    error!("parent {} has no upper inode", parent_node.inode);
2226                    Error::from_raw_os_error(libc::EINVAL)
2227                })?;
2228                // We manually unfold the `create` logic here instead of calling the `create` method directly.
2229                // This is necessary to preserve the original file's UID and GID during the copy-up process.
2230                if !parent_real_inode.in_upper_layer {
2231                    return Err(Error::from_raw_os_error(libc::EROFS));
2232                }
2233                let name = node.name.read().await;
2234                let name = OsStr::new(name.as_str());
2235                let op_ctx = crate::context::OperationContext::with_credentials(
2236                    ctx,
2237                    st.attr.uid,
2238                    st.attr.gid,
2239                );
2240                let create_rep = parent_real_inode
2241                    .layer
2242                    .create_with_context(
2243                        op_ctx,
2244                        parent_real_inode.inode,
2245                        name,
2246                        mode,
2247                        flags.try_into().unwrap(),
2248                    )
2249                    .await?;
2250
2251                let (inode, h) = (
2252                    RealInode {
2253                        layer: parent_real_inode.layer.clone(),
2254                        in_upper_layer: true,
2255                        inode: create_rep.attr.ino,
2256                        whiteout: false,
2257                        opaque: false,
2258                        stat: Some(ReplyAttr {
2259                            ttl: create_rep.ttl,
2260                            attr: create_rep.attr,
2261                        }),
2262                    },
2263                    Some(create_rep.fh),
2264                );
2265                trace!(
2266                    "copy_regfile_up: created upper file {name:?} with inode {}",
2267                    inode.inode
2268                );
2269                *upper_handle.lock().await = h.unwrap_or(0);
2270                upper_real_inode.lock().await.replace(inode);
2271                Ok(false)
2272            })
2273            .await?;
2274
2275        let rep = lower_layer
2276            .open(ctx, lower_inode, libc::O_RDONLY as u32)
2277            .await?;
2278
2279        let lower_handle = rep.fh;
2280
2281        // need to use work directory and then rename file to
2282        // final destination for atomic reasons.. not deal with it for now,
2283        // use stupid copy at present.
2284        // FIXME: this need a lot of work here, ntimes, xattr, etc.
2285
2286        // Copy from lower real inode to upper real inode.
2287        // TODO: use sendfile here.
2288
2289        let u_handle = *upper_handle.lock().await;
2290        let ri = upper_real_inode.lock().await.take();
2291        if let Some(ri) = ri {
2292            let mut offset: usize = 0;
2293            let size = 4 * 1024 * 1024;
2294
2295            loop {
2296                let ret = lower_layer
2297                    .read(ctx, lower_inode, lower_handle, offset as u64, size)
2298                    .await?;
2299
2300                let len = ret.data.len();
2301                if len == 0 {
2302                    break;
2303                }
2304
2305                let ret = ri
2306                    .layer
2307                    .write(ctx, ri.inode, u_handle, offset as u64, &ret.data, 0, 0)
2308                    .await?;
2309
2310                assert_eq!(ret.written as usize, len);
2311                offset += ret.written as usize;
2312            }
2313
2314            if let Err(e) = ri.layer.release(ctx, ri.inode, u_handle, 0, 0, true).await {
2315                let e: std::io::Error = e.into();
2316                // Ignore ENOSYS.
2317                if e.raw_os_error() != Some(libc::ENOSYS) {
2318                    return Err(e);
2319                }
2320            }
2321            node.add_upper_inode(ri, true).await;
2322        } else {
2323            error!("BUG: upper real inode is None after copy up");
2324        }
2325
2326        lower_layer
2327            .release(ctx, lower_inode, lower_handle, 0, 0, true)
2328            .await?;
2329
2330        Ok(Arc::clone(&node))
2331    }
2332
2333    /// Copies the specified node to the upper layer of the filesystem
2334    ///
2335    /// Performs different operations based on the node type:
2336    /// - **Directory**: Creates a corresponding directory in the upper layer
2337    /// - **Symbolic link**: Recursively copies to the upper layer
2338    /// - **Regular file**: Copies file content to the upper layer
2339    ///
2340    /// # Parameters
2341    /// * `ctx`: FUSE request context
2342    /// * `node`: Reference to the node to be copied
2343    ///
2344    /// # Returns
2345    /// Returns a reference to the upper-layer node on success, or an error on failure
2346    async fn copy_node_up(
2347        &self,
2348        ctx: Request,
2349        node: Arc<OverlayInode>,
2350    ) -> Result<Arc<OverlayInode>> {
2351        if node.in_upper_layer().await {
2352            return Ok(node);
2353        }
2354
2355        let st = node.stat64(ctx).await?;
2356        match st.attr.kind {
2357            FileType::Directory => {
2358                node.clone().create_upper_dir(ctx, None).await?;
2359                Ok(node)
2360            }
2361            FileType::Symlink => {
2362                // For symlink.
2363                self.copy_symlink_up(ctx, node).await
2364            }
2365            FileType::RegularFile => {
2366                // For regular file.
2367                self.copy_regfile_up(ctx, node).await
2368            }
2369            _ => {
2370                // For other file types. return error.
2371                Err(Error::from_raw_os_error(libc::EINVAL))
2372            }
2373        }
2374    }
2375
2376    /// recursively copy directory and all its contents to upper layer
2377    async fn copy_directory_up(
2378        &self,
2379        ctx: Request,
2380        node: Arc<OverlayInode>,
2381    ) -> Result<Arc<OverlayInode>> {
2382        // Ensure the directory itself is copied up first
2383        self.copy_node_up(ctx, node.clone()).await?;
2384
2385        // load directory to cache
2386        self.load_directory(ctx, &node).await?;
2387
2388        // go through all children
2389        let children = node.childrens.lock().await.clone();
2390        for (_name, child) in children.iter() {
2391            if _name == "." || _name == ".." {
2392                continue;
2393            }
2394            // jump over whiteout
2395            if child.whiteout.load(Ordering::Relaxed) {
2396                continue;
2397            }
2398            let st = child.stat64(ctx).await?;
2399            if !child.in_upper_layer().await {
2400                match st.attr.kind {
2401                    FileType::Directory => {
2402                        // recursively copy subdirectory
2403                        Box::pin(self.copy_directory_up(ctx, child.clone())).await?;
2404                    }
2405                    FileType::Symlink | FileType::RegularFile => {
2406                        // copy node up symlink or regular file
2407                        Box::pin(self.copy_node_up(ctx, child.clone())).await?;
2408                    }
2409                    _ => {
2410                        // other file types are ignored
2411                    }
2412                }
2413            } else if utils::is_dir(&st.attr.kind) {
2414                // If it is already in the upper layer, but the directory is not loaded,
2415                // ensure that its contents are also copied up recursively.
2416                Box::pin(self.copy_directory_up(ctx, child.clone())).await?;
2417            }
2418        }
2419
2420        Ok(node)
2421    }
2422
2423    async fn do_rm(&self, ctx: Request, parent: u64, name: &OsStr, dir: bool) -> Result<()> {
2424        // 1. Read-only mount guard
2425        if self.upper_layer.is_none() {
2426            return Err(Error::from_raw_os_error(libc::EROFS));
2427        }
2428
2429        // 2. Locate the parent Overlay Inode.
2430        // Find parent Overlay Inode.
2431        let pnode = self.lookup_node(ctx, parent, "").await?;
2432        if pnode.whiteout.load(Ordering::Relaxed) {
2433            return Err(Error::from_raw_os_error(libc::ENOENT));
2434        }
2435        let to_name = name.to_str().unwrap();
2436
2437        // 3. Locate the child Overlay Inode for the given name
2438        // Find the Overlay Inode for child with <name>.
2439        let node = self.lookup_node(ctx, parent, to_name).await?;
2440        if node.whiteout.load(Ordering::Relaxed) {
2441            // already deleted.
2442            return Err(Error::from_raw_os_error(libc::ENOENT));
2443        }
2444
2445        // 4. If removing a directory, ensure it is empty of real entries
2446        if dir {
2447            self.load_directory(ctx, &node).await?;
2448            let (count, whiteouts) = node.count_entries_and_whiteout(ctx).await?;
2449            trace!("entries: {count}, whiteouts: {whiteouts}\n");
2450            if count > 0 {
2451                return Err(Error::from_raw_os_error(libc::ENOTEMPTY));
2452            }
2453
2454            // Delete all whiteouts.
2455            if whiteouts > 0 && node.in_upper_layer().await {
2456                self.empty_node_directory(ctx, Arc::clone(&node)).await?;
2457            }
2458
2459            trace!("whiteouts deleted!\n");
2460        }
2461
2462        // 5. Decide whether we need to create a whiteout entry
2463        // We'll filp this off if upper-layer unlink suffices or parent is opaque
2464        let need_whiteout = AtomicBool::new(true);
2465        let pnode = self.copy_node_up(ctx, Arc::clone(&pnode)).await?;
2466
2467        if node.upper_layer_only().await {
2468            need_whiteout.store(false, Ordering::Relaxed);
2469        }
2470
2471        let mut df = |parent_upper_inode: Option<Arc<RealInode>>| async {
2472            let parent_real_inode = parent_upper_inode.ok_or_else(|| {
2473                error!(
2474                    "BUG: parent {} has no upper inode after copy up",
2475                    pnode.inode
2476                );
2477                Error::from_raw_os_error(libc::EINVAL)
2478            })?;
2479
2480            // Parent is opaque, it shadows everything in lower layers so no need to create extra whiteouts.
2481            if parent_real_inode.opaque {
2482                need_whiteout.store(false, Ordering::Relaxed);
2483            }
2484            if dir {
2485                parent_real_inode
2486                    .layer
2487                    .rmdir(ctx, parent_real_inode.inode, name)
2488                    .await?;
2489            } else {
2490                parent_real_inode
2491                    .layer
2492                    .unlink(ctx, parent_real_inode.inode, name)
2493                    .await?;
2494            }
2495
2496            Ok(false)
2497        };
2498
2499        // 6. Perform the unlink/rmdir operation and memory cleanup
2500        if node.in_upper_layer().await {
2501            pnode.handle_upper_inode_locked(&mut df).await?;
2502        }
2503        pnode.remove_child(name.to_str().unwrap()).await;
2504        let path = node.path.read().await.clone();
2505        self.remove_inode(node.inode, Some(path)).await;
2506
2507        // 7. If needed, create a entry in the upper layer to mask lower-layer files
2508        if need_whiteout.load(Ordering::Relaxed) {
2509            trace!("do_rm: creating whiteout\n");
2510            // pnode is copied up, so it has upper layer.
2511            pnode
2512                .handle_upper_inode_locked(
2513                    &mut |parent_upper_inode: Option<Arc<RealInode>>| async {
2514                        let parent_real_inode = parent_upper_inode.ok_or_else(|| {
2515                            error!(
2516                                "BUG: parent {} has no upper inode after copy up",
2517                                pnode.inode
2518                            );
2519                            Error::from_raw_os_error(libc::EINVAL)
2520                        })?;
2521
2522                        let child_ri = parent_real_inode.create_whiteout(ctx, to_name).await?; //FIXME..............
2523                        let path = format!("{}/{}", pnode.path.read().await, to_name);
2524                        let ino: u64 = self.alloc_inode(&path).await?;
2525                        let ovi = Arc::new(
2526                            OverlayInode::new_from_real_inode(to_name, ino, path.clone(), child_ri)
2527                                .await,
2528                        );
2529
2530                        self.insert_inode(ino, ovi.clone()).await;
2531                        pnode.insert_child(to_name, ovi.clone()).await;
2532                        Ok(false)
2533                    },
2534                )
2535                .await?;
2536        }
2537
2538        Ok(())
2539    }
2540
2541    async fn do_fsync(
2542        &self,
2543        ctx: Request,
2544        inode: Inode,
2545        datasync: bool,
2546        handle: Handle,
2547        syncdir: bool,
2548    ) -> Result<()> {
2549        // Use O_RDONLY flags which indicates no copy up.
2550        let data = self
2551            .get_data(ctx, Some(handle), inode, libc::O_RDONLY as u32)
2552            .await?;
2553
2554        trace!("do_fsync: got data for handle: {handle}, inode:{inode}");
2555
2556        match data.real_handle {
2557            // FIXME: need to test if inode matches corresponding handle?
2558            None => {
2559                trace!("do_fsync: no real handle found for handle: {handle}, inode:{inode}");
2560                Err(Error::from_raw_os_error(libc::ENOENT))
2561            }
2562            Some(ref rh) => {
2563                let real_handle = rh.handle.load(Ordering::Relaxed);
2564                // TODO: check if it's in upper layer? @weizhang555
2565                if syncdir {
2566                    trace!(
2567                        "do_fsync: layer.fsyncdir called for handle: {}, inode:{}; rh.inode: {}, real_handle: {}",
2568                        handle, inode, rh.inode, real_handle
2569                    );
2570                    rh.layer
2571                        .fsyncdir(ctx, rh.inode, real_handle, datasync)
2572                        .await
2573                        .map_err(|e| e.into())
2574                } else {
2575                    rh.layer
2576                        .fsync(ctx, rh.inode, real_handle, datasync)
2577                        .await
2578                        .map_err(|e| e.into())
2579                }
2580            }
2581        }
2582    }
2583
2584    // Delete everything in the directory only on upper layer, ignore lower layers.
2585    async fn empty_node_directory(&self, ctx: Request, node: Arc<OverlayInode>) -> Result<()> {
2586        let st = node.stat64(ctx).await?;
2587        if !utils::is_dir(&st.attr.kind) {
2588            // This function can only be called on directories.
2589            return Err(Error::from_raw_os_error(libc::ENOTDIR));
2590        }
2591
2592        let (layer, in_upper, inode) = node.first_layer_inode().await;
2593        if !in_upper {
2594            return Ok(());
2595        }
2596
2597        // Copy node.childrens Hashmap to Vector, the Vector is also used as temp storage,
2598        // Without this, Rust won't allow us to remove them from node.childrens.
2599        let iter = node
2600            .childrens
2601            .lock()
2602            .await
2603            .values()
2604            .cloned()
2605            .collect::<Vec<_>>();
2606
2607        for child in iter {
2608            // We only care about upper layer, ignore lower layers.
2609            if child.in_upper_layer().await {
2610                let child_name = child.name.read().await.clone();
2611                let child_name_os = OsStr::new(&child_name);
2612                if child.whiteout.load(Ordering::Relaxed) {
2613                    layer.delete_whiteout(ctx, inode, child_name_os).await?
2614                } else {
2615                    let s = child.stat64(ctx).await?;
2616                    let cname: &OsStr = OsStr::new(&child_name_os);
2617                    if utils::is_dir(&s.attr.kind) {
2618                        let (count, whiteouts) = child.count_entries_and_whiteout(ctx).await?;
2619                        if count + whiteouts > 0 {
2620                            let cb = child.clone();
2621                            Box::pin(async move { self.empty_node_directory(ctx, cb).await })
2622                                .await?;
2623                        }
2624                        layer.rmdir(ctx, inode, cname).await?
2625                    } else {
2626                        layer.unlink(ctx, inode, cname).await?;
2627                    }
2628                }
2629
2630                let cpath = child.path.read().await.clone();
2631                // delete the child
2632                self.remove_inode(child.inode, Some(cpath)).await;
2633                node.remove_child(&child_name).await;
2634            }
2635        }
2636
2637        Ok(())
2638    }
2639
2640    async fn find_real_info_from_handle(
2641        &self,
2642        handle: Handle,
2643    ) -> Result<(Arc<BoxedLayer>, Inode, Handle)> {
2644        match self.handles.lock().await.get(&handle) {
2645            Some(h) => match h.real_handle {
2646                Some(ref rhd) => {
2647                    trace!(
2648                        "find_real_info_from_handle: layer in upper: {}",
2649                        rhd.in_upper_layer
2650                    );
2651                    Ok((
2652                        rhd.layer.clone(),
2653                        rhd.inode,
2654                        rhd.handle.load(Ordering::Relaxed),
2655                    ))
2656                }
2657                None => Err(Error::from_raw_os_error(libc::ENOENT)),
2658            },
2659
2660            None => Err(Error::from_raw_os_error(libc::ENOENT)),
2661        }
2662    }
2663
2664    async fn find_real_inode(&self, inode: Inode) -> Result<(Arc<BoxedLayer>, Inode)> {
2665        if let Some(n) = self.get_active_inode(inode).await {
2666            let (first_layer, _, first_inode) = n.first_layer_inode().await;
2667            return Ok((first_layer, first_inode));
2668        } else if let Some(n) = self.get_all_inode(inode).await {
2669            trace!("find_real_inode: found inode by get_all_inode: {}", n.inode);
2670            let (first_layer, _, first_inode) = n.first_layer_inode().await;
2671            return Ok((first_layer, first_inode));
2672        }
2673
2674        Err(Error::from_raw_os_error(libc::ENOENT))
2675    }
2676
2677    async fn get_data(
2678        &self,
2679        ctx: Request,
2680        handle: Option<Handle>,
2681        inode: Inode,
2682        flags: u32,
2683    ) -> Result<Arc<HandleData>> {
2684        let no_open = self.no_open.load(Ordering::Relaxed);
2685        if !no_open {
2686            if let Some(h) = handle
2687                && let Some(v) = self.handles.lock().await.get(&h)
2688                && v.node.inode == inode
2689            {
2690                // trace!("get_data: found handle");
2691                return Ok(Arc::clone(v));
2692            }
2693        } else {
2694            let readonly: bool = flags
2695                & (libc::O_APPEND | libc::O_CREAT | libc::O_TRUNC | libc::O_RDWR | libc::O_WRONLY)
2696                    as u32
2697                == 0;
2698
2699            // lookup node
2700            let node = self.lookup_node(ctx, inode, "").await?;
2701
2702            // whiteout node
2703            if node.whiteout.load(Ordering::Relaxed) {
2704                return Err(Error::from_raw_os_error(libc::ENOENT));
2705            }
2706
2707            if !readonly {
2708                // Check if upper layer exists, return EROFS is not exists.
2709                self.upper_layer
2710                    .as_ref()
2711                    .cloned()
2712                    .ok_or_else(|| Error::from_raw_os_error(libc::EROFS))?;
2713                // copy up to upper layer
2714                self.copy_node_up(ctx, Arc::clone(&node)).await?;
2715            }
2716
2717            let (layer, in_upper_layer, inode) = node.first_layer_inode().await;
2718            let handle_data = HandleData {
2719                node: Arc::clone(&node),
2720                real_handle: Some(RealHandle {
2721                    layer,
2722                    in_upper_layer,
2723                    inode,
2724                    handle: AtomicU64::new(0),
2725                }),
2726                dir_snapshot: Mutex::new(None),
2727            };
2728            return Ok(Arc::new(handle_data));
2729        }
2730
2731        Err(Error::from_raw_os_error(libc::ENOENT))
2732    }
2733
2734    // extend or init the inodes number to one overlay if the current number is done.
2735    pub async fn extend_inode_alloc(&self, key: u64) {
2736        let next_inode = key * INODE_ALLOC_BATCH;
2737        let limit_inode = next_inode + INODE_ALLOC_BATCH - 1;
2738        self.inodes
2739            .write()
2740            .await
2741            .extend_inode_number(next_inode, limit_inode);
2742    }
2743}
2744
2745/// Wrap the parameters for mounting overlay filesystem.
2746#[derive(Debug, Clone)]
2747pub struct OverlayArgs<P, Q, R, M, N, I>
2748where
2749    P: AsRef<Path>,
2750    Q: AsRef<Path>,
2751    R: AsRef<Path>,
2752    M: AsRef<str>,
2753    N: Into<String>,
2754    I: IntoIterator<Item = R>,
2755{
2756    pub mountpoint: P,
2757    pub upperdir: Q,
2758    pub lowerdir: I,
2759    pub privileged: bool,
2760    pub mapping: Option<M>,
2761    pub name: Option<N>,
2762    pub allow_other: bool,
2763}
2764
2765/// Mounts the filesystem using the given parameters and returns the mount handle.
2766///
2767/// # Parameters
2768/// - `mountpoint`: Path to the mount point.
2769/// - `upperdir`: Path to the upper directory.
2770/// - `lowerdir`: Paths to the lower directories.
2771/// - `privileged`: If true, use privileged mount; otherwise, unprivileged mount.
2772/// - `mapping`: Optional user/group ID mapping for unprivileged mounts.
2773/// - `name`: Optional name for the filesystem.
2774/// - `allow_other`: If true, allows other users to access the filesystem.
2775///
2776/// # Returns
2777/// A mount handle on success.
2778pub async fn mount_fs<P, Q, R, M, N, I>(
2779    args: OverlayArgs<P, Q, R, M, N, I>,
2780) -> rfuse3::raw::MountHandle
2781where
2782    P: AsRef<Path>,
2783    Q: AsRef<Path>,
2784    R: AsRef<Path>,
2785    M: AsRef<str>,
2786    N: Into<String>,
2787    I: IntoIterator<Item = R>,
2788{
2789    // Create lower layers
2790    let mut lower_layers: Vec<Arc<BoxedLayer>> = Vec::new();
2791    for lower in args.lowerdir {
2792        let layer = new_passthroughfs_layer(PassthroughArgs {
2793            root_dir: lower,
2794            mapping: args.mapping.as_ref().map(|m| m.as_ref()),
2795        })
2796        .await
2797        .expect("Failed to create lower filesystem layer");
2798        lower_layers.push(Arc::new(layer) as Arc<BoxedLayer>);
2799    }
2800    // Create upper layer
2801    let upper_layer: Arc<BoxedLayer> = Arc::new(
2802        new_passthroughfs_layer(PassthroughArgs {
2803            root_dir: args.upperdir,
2804            mapping: args.mapping.as_ref().map(|m| m.as_ref()),
2805        })
2806        .await
2807        .expect("Failed to create upper filesystem layer"),
2808    );
2809
2810    // Configure overlay filesystem
2811    let config = Config {
2812        mountpoint: args.mountpoint.as_ref().to_path_buf(),
2813        do_import: true,
2814        ..Default::default()
2815    };
2816    let overlayfs = OverlayFs::new(Some(upper_layer), lower_layers, config, 1)
2817        .expect("Failed to initialize OverlayFs");
2818    let logfs = LoggingFileSystem::new(overlayfs);
2819
2820    let mount_path: OsString = OsString::from(args.mountpoint.as_ref().as_os_str());
2821
2822    // Obtain the current user's uid and gid
2823    let uid = unsafe { libc::getuid() };
2824    let gid = unsafe { libc::getgid() };
2825
2826    let mut mount_options = MountOptions::default();
2827    #[cfg(target_os = "linux")]
2828    mount_options.force_readdir_plus(true);
2829
2830    mount_options
2831        .uid(uid)
2832        .gid(gid)
2833        .allow_other(args.allow_other);
2834    if let Some(name) = args.name {
2835        mount_options.fs_name(name);
2836    }
2837
2838    // Mount filesystem based on privilege flag and return the mount handle
2839    if !args.privileged {
2840        debug!("Mounting with unprivileged mode");
2841        Session::new(mount_options)
2842            .mount_with_unprivileged(logfs, mount_path)
2843            .await
2844            .expect("Unprivileged mount failed")
2845    } else {
2846        debug!("Mounting with privileged mode");
2847        Session::new(mount_options)
2848            .mount(logfs, mount_path)
2849            .await
2850            .expect("Privileged mount failed")
2851    }
2852}