nydus_service/
fusedev.rs

1// Copyright 2020 Ant Group. All rights reserved.
2// Copyright (C) 2020 Alibaba Cloud. All rights reserved.
3//
4// SPDX-License-Identifier: (Apache-2.0 AND BSD-3-Clause)
5
6//! Nydus FUSE filesystem daemon.
7
8use core::option::Option::None;
9use nydus_rafs::metadata::{RafsInode, RafsInodeWalkAction};
10use std::any::Any;
11use std::ffi::{CStr, CString, OsStr, OsString};
12use std::fs::metadata;
13use std::io::{Error, ErrorKind, Result, Write};
14use std::ops::Deref;
15#[cfg(target_os = "linux")]
16use std::os::linux::fs::MetadataExt;
17#[cfg(target_os = "linux")]
18use std::os::unix::ffi::OsStrExt;
19#[cfg(target_os = "macos")]
20use std::os::unix::fs::MetadataExt;
21use std::os::unix::net::UnixStream;
22use std::path::{Path, PathBuf};
23use std::sync::{
24    atomic::{AtomicI32, AtomicU64, Ordering},
25    mpsc::{channel, Receiver, Sender},
26    Arc, Mutex, MutexGuard,
27};
28use std::thread::{self, JoinHandle};
29use std::time::{SystemTime, UNIX_EPOCH};
30
31use fuse_backend_rs::abi::fuse_abi::{InHeader, OutHeader};
32use fuse_backend_rs::api::server::{MetricsHook, Server};
33use fuse_backend_rs::api::Vfs;
34use fuse_backend_rs::transport::{FuseChannel, FuseSession, FuseSessionExt};
35use mio::Waker;
36#[cfg(target_os = "linux")]
37use nix::sys::stat::{major, minor};
38use nydus_api::BuildTimeInfo;
39use serde::Serialize;
40
41use crate::daemon::{
42    DaemonState, DaemonStateMachineContext, DaemonStateMachineInput, DaemonStateMachineSubscriber,
43    NydusDaemon,
44};
45use crate::fs_service::{FsBackendCollection, FsBackendMountCmd, FsService};
46use crate::upgrade::{self, FailoverPolicy, UpgradeManager};
47use crate::{Error as NydusError, FsBackendType, FuseNotifyError, Result as NydusResult};
48
49const FS_IDX_SHIFT: u64 = 56;
50
51#[derive(Serialize)]
52struct FuseOp {
53    inode: u64,
54    opcode: u32,
55    unique: u64,
56    timestamp_secs: u64,
57}
58
59impl Default for FuseOp {
60    fn default() -> Self {
61        // unwrap because time can't be earlier than EPOCH.
62        let timestamp_secs = SystemTime::now()
63            .duration_since(UNIX_EPOCH)
64            .unwrap()
65            .as_secs();
66
67        Self {
68            inode: u64::default(),
69            opcode: u32::default(),
70            unique: u64::default(),
71            timestamp_secs,
72        }
73    }
74}
75
76#[derive(Default, Clone, Serialize)]
77struct FuseOpWrapper {
78    // Mutex should be acceptable since `inflight_op` is always updated
79    // within the same thread, which means locking is always directly acquired.
80    op: Arc<Mutex<Option<FuseOp>>>,
81}
82
83impl MetricsHook for FuseOpWrapper {
84    fn collect(&self, ih: &InHeader) {
85        let (n, u, o) = (ih.nodeid, ih.unique, ih.opcode);
86        // Unwrap is safe because time can't be earlier than EPOCH
87        let timestamp_secs = SystemTime::now()
88            .duration_since(UNIX_EPOCH)
89            .unwrap()
90            .as_secs();
91        let op = FuseOp {
92            inode: n,
93            unique: u,
94            opcode: o,
95            timestamp_secs,
96        };
97
98        *self.op.lock().expect("Not expect poisoned lock") = Some(op);
99    }
100
101    fn release(&self, _oh: Option<&OutHeader>) {
102        *self.op.lock().expect("Not expect poisoned lock") = None
103    }
104}
105
106struct FuseServer {
107    server: Arc<Server<Arc<Vfs>>>,
108    ch: FuseChannel,
109}
110
111impl FuseServer {
112    fn new(server: Arc<Server<Arc<Vfs>>>, se: &FuseSession) -> Result<FuseServer> {
113        let ch = se.new_channel().map_err(|e| eother!(e))?;
114        Ok(FuseServer { server, ch })
115    }
116
117    fn svc_loop(&mut self, metrics_hook: &dyn MetricsHook) -> Result<()> {
118        // Given error EBADF, it means kernel has shut down this session.
119        let _ebadf = Error::from_raw_os_error(libc::EBADF);
120
121        loop {
122            if let Some((reader, writer)) = self.ch.get_request().map_err(|e| {
123                Error::new(
124                    ErrorKind::Other,
125                    format!("failed to get fuse request from /dev/fuse, {}", e),
126                )
127            })? {
128                if let Err(e) =
129                    self.server
130                        .handle_message(reader, writer.into(), None, Some(metrics_hook))
131                {
132                    match e {
133                        fuse_backend_rs::Error::EncodeMessage(_ebadf) => {
134                            return Err(eio!("fuse session has been shut down"));
135                        }
136                        _ => {
137                            error!("Handling fuse message, {}", NydusError::ProcessQueue(e));
138                            continue;
139                        }
140                    }
141                }
142            } else {
143                info!("fuse server exits");
144                break;
145            }
146        }
147
148        Ok(())
149    }
150}
151
152struct FusedevNotifier<'a> {
153    session: &'a Mutex<FuseSession>,
154    server: &'a Arc<Server<Arc<Vfs>>>,
155}
156
157impl<'a> FusedevNotifier<'a> {
158    fn new(session: &'a Mutex<FuseSession>, server: &'a Arc<Server<Arc<Vfs>>>) -> Self {
159        FusedevNotifier { session, server }
160    }
161
162    fn notify_resend(&self) -> NydusResult<()> {
163        let mut session = self.session.lock().unwrap();
164        session
165            .try_with_writer(|writer| {
166                self.server
167                    .notify_resend(writer)
168                    .map_err(FuseNotifyError::FuseWriteError)
169            })
170            .map_err(NydusError::NotifyError)
171    }
172}
173
174struct FuseSysfsNotifier<'a> {
175    conn: &'a AtomicU64,
176}
177
178impl<'a> FuseSysfsNotifier<'a> {
179    fn new(conn: &'a AtomicU64) -> Self {
180        Self { conn }
181    }
182
183    fn get_possible_base_paths() -> Vec<&'static str> {
184        vec!["/proc/sys/fs/fuse/connections", "/sys/fs/fuse/connections"]
185    }
186
187    fn try_notify_with_path(
188        &self,
189        base_path: &str,
190        event: &str,
191    ) -> std::result::Result<(), FuseNotifyError> {
192        let path = PathBuf::from(base_path)
193            .join(self.conn.load(Ordering::Acquire).to_string())
194            .join(event);
195
196        let mut file = std::fs::OpenOptions::new()
197            .write(true)
198            .open(&path)
199            .map_err(FuseNotifyError::SysfsOpenError)?;
200
201        file.write_all(b"1")
202            .map_err(FuseNotifyError::SysfsWriteError)?;
203        Ok(())
204    }
205
206    fn notify(&self, event: &str) -> NydusResult<()> {
207        let paths = Self::get_possible_base_paths();
208
209        for (idx, path) in paths.iter().enumerate() {
210            match self.try_notify_with_path(path, event) {
211                Ok(()) => return Ok(()),
212                Err(e) => {
213                    if !matches!(e, FuseNotifyError::SysfsOpenError(_)) || idx == paths.len() - 1 {
214                        return Err(e.into());
215                    }
216                }
217            }
218        }
219
220        Ok(())
221    }
222
223    fn notify_resend(&self) -> NydusResult<()> {
224        self.notify("resend")
225    }
226
227    fn notify_flush(&self) -> NydusResult<()> {
228        self.notify("flush")
229    }
230}
231
232#[allow(dead_code)]
233pub struct FusedevFsService {
234    /// Fuse connection ID which usually equals to `st_dev`
235    pub conn: AtomicU64,
236    pub failover_policy: FailoverPolicy,
237    pub session: Mutex<FuseSession>,
238
239    server: Arc<Server<Arc<Vfs>>>,
240    upgrade_mgr: Option<Mutex<UpgradeManager>>,
241    vfs: Arc<Vfs>,
242
243    backend_collection: Mutex<FsBackendCollection>,
244    inflight_ops: Mutex<Vec<FuseOpWrapper>>,
245}
246
247impl FusedevFsService {
248    fn new(
249        vfs: Arc<Vfs>,
250        mnt: &Path,
251        supervisor: Option<&String>,
252        failover_policy: FailoverPolicy,
253        readonly: bool,
254    ) -> Result<Self> {
255        let session = FuseSession::new(mnt, "rafs", "", readonly).map_err(|e| eother!(e))?;
256        let upgrade_mgr = supervisor
257            .as_ref()
258            .map(|s| Mutex::new(UpgradeManager::new(s.to_string().into())));
259
260        Ok(FusedevFsService {
261            vfs: vfs.clone(),
262            conn: AtomicU64::new(0),
263            failover_policy,
264            session: Mutex::new(session),
265            server: Arc::new(Server::new(vfs)),
266            upgrade_mgr,
267
268            backend_collection: Default::default(),
269            inflight_ops: Default::default(),
270        })
271    }
272
273    fn create_fuse_server(&self) -> Result<FuseServer> {
274        FuseServer::new(self.server.clone(), self.session.lock().unwrap().deref())
275    }
276
277    fn create_inflight_op(&self) -> FuseOpWrapper {
278        let inflight_op = FuseOpWrapper::default();
279
280        // "Not expected poisoned lock"
281        self.inflight_ops.lock().unwrap().push(inflight_op.clone());
282
283        inflight_op
284    }
285
286    fn umount(&self) -> NydusResult<()> {
287        let mut session = self.session.lock().expect("Not expect poisoned lock.");
288        session.umount().map_err(NydusError::SessionShutdown)?;
289        session.wake().map_err(NydusError::SessionShutdown)?;
290        Ok(())
291    }
292
293    pub fn drain_fuse_requests(&self) -> NydusResult<()> {
294        let fusedev_notifier = FusedevNotifier::new(&self.session, &self.server);
295        let sysfs_notifier = FuseSysfsNotifier::new(&self.conn);
296
297        match self.failover_policy {
298            FailoverPolicy::None => Ok(()),
299            FailoverPolicy::Flush => sysfs_notifier.notify_flush(),
300            FailoverPolicy::Resend => fusedev_notifier.notify_resend().or_else(|e| {
301                error!(
302                    "Failed to notify resend by /dev/fuse, {:?}. Trying to do it by sysfs",
303                    e
304                );
305                sysfs_notifier.notify_resend()
306            }),
307        }
308    }
309}
310
311impl FsService for FusedevFsService {
312    fn get_vfs(&self) -> &Vfs {
313        &self.vfs
314    }
315
316    fn upgrade_mgr(&self) -> Option<MutexGuard<'_, UpgradeManager>> {
317        self.upgrade_mgr.as_ref().map(|mgr| mgr.lock().unwrap())
318    }
319
320    fn backend_collection(&self) -> MutexGuard<'_, FsBackendCollection> {
321        self.backend_collection.lock().unwrap()
322    }
323
324    fn export_inflight_ops(&self) -> NydusResult<Option<String>> {
325        let ops = self.inflight_ops.lock().unwrap();
326
327        let r = ops
328            .iter()
329            .filter(|w| w.op.lock().unwrap().is_some())
330            .map(|w| &w.op)
331            .collect::<Vec<&Arc<Mutex<Option<FuseOp>>>>>();
332
333        if r.is_empty() {
334            Ok(None)
335        } else {
336            let resp = serde_json::to_string(&r).map_err(NydusError::Serde)?;
337            Ok(Some(resp))
338        }
339    }
340
341    /// Recursively walk the inode tree and send cache invalidation notifications.
342    fn walk_and_notify_invalidation(
343        &self,
344        parent_kernel_ino: u64,
345        cur_name: &str,
346        cur_inode: Arc<dyn RafsInode>,
347        fs_idx: u8,
348    ) -> NydusResult<()> {
349        let cur_kernel_ino = ((fs_idx as u64) << FS_IDX_SHIFT) | cur_inode.ino();
350
351        if cur_inode.is_dir() {
352            let mut handler =
353                |child: Option<Arc<dyn RafsInode>>, name: OsString, _ino: u64, _offset: u64| {
354                    if name != OsStr::new(".") && name != OsStr::new("..") {
355                        if let Some(child_inode) = child {
356                            let child_name = name.to_string_lossy().to_string();
357                            // Recursive call
358                            if let Err(e) = self.walk_and_notify_invalidation(
359                                cur_kernel_ino,
360                                &child_name,
361                                child_inode,
362                                fs_idx,
363                            ) {
364                                warn!("recursive walk failed for {}: {:?}", child_name, e);
365                            }
366                        }
367                    }
368                    Ok(RafsInodeWalkAction::Continue)
369                };
370
371            cur_inode.walk_children_inodes(0, &mut handler)?;
372        }
373
374        // === Post-order: invalidate cache of the current node ===
375        let cstr_name = CString::new(cur_name).map_err(|_| eother!("invalid file name"))?;
376        // Invalidate inode cache
377        self.session.lock().unwrap().with_writer(|writer| {
378            if let Err(e) = self.server.notify_inval_inode(writer, cur_kernel_ino, 0, 0) {
379                warn!("notify_inval_inode failed: {} {:?}", cur_name, e);
380            }
381        });
382
383        self.session.lock().unwrap().with_writer(|writer| {
384            if let Err(e) =
385                self.server
386                    .notify_inval_entry(writer, parent_kernel_ino, cstr_name.as_c_str())
387            {
388                warn!("notify_inval_entry failed: {} {:?}", cur_name, e);
389            }
390        });
391
392        Ok(())
393    }
394
395    /// Check whether the filesystem service is a FUSE service.
396    fn is_fuse(&self) -> bool {
397        true
398    }
399
400    fn as_any(&self) -> &dyn Any {
401        self
402    }
403}
404
405/// Nydus daemon to implement FUSE servers by accessing `/dev/fuse`.
406///
407/// One FUSE mountpoint will be created for each [FusedevDaemon] object. Every [FusedevDaemon]
408/// object has a built-in [Vfs](https://docs.rs/fuse-backend-rs/latest/fuse_backend_rs/api/vfs/struct.Vfs.html)
409/// object, which can be used to mount multiple RAFS and/or passthroughfs instances.
410pub struct FusedevDaemon {
411    bti: BuildTimeInfo,
412    id: Option<String>,
413    request_sender: Arc<Mutex<Sender<DaemonStateMachineInput>>>,
414    result_receiver: Mutex<Receiver<NydusResult<()>>>,
415    service: Arc<FusedevFsService>,
416    state: AtomicI32,
417    pub supervisor: Option<String>,
418    threads_cnt: u32,
419    state_machine_thread: Mutex<Option<JoinHandle<Result<()>>>>,
420    fuse_service_threads: Mutex<Vec<JoinHandle<Result<()>>>>,
421    waker: Arc<Waker>,
422}
423
424impl FusedevDaemon {
425    /// Create a new instance of [FusedevDaemon].
426    #[allow(clippy::too_many_arguments)]
427    pub fn new(
428        trigger: Sender<DaemonStateMachineInput>,
429        receiver: Receiver<NydusResult<()>>,
430        vfs: Arc<Vfs>,
431        mountpoint: &Path,
432        threads_cnt: u32,
433        waker: Arc<Waker>,
434        bti: BuildTimeInfo,
435        id: Option<String>,
436        supervisor: Option<String>,
437        readonly: bool,
438        fp: FailoverPolicy,
439    ) -> Result<Self> {
440        let service = FusedevFsService::new(vfs, mountpoint, supervisor.as_ref(), fp, readonly)?;
441
442        Ok(FusedevDaemon {
443            bti,
444            id,
445            supervisor,
446            threads_cnt,
447            waker,
448
449            state: AtomicI32::new(DaemonState::INIT as i32),
450            result_receiver: Mutex::new(receiver),
451            request_sender: Arc::new(Mutex::new(trigger)),
452            service: Arc::new(service),
453            state_machine_thread: Mutex::new(None),
454            fuse_service_threads: Mutex::new(Vec::new()),
455        })
456    }
457
458    fn kick_one_server(&self, waker: Arc<Waker>) -> NydusResult<()> {
459        let mut s = self
460            .service
461            .create_fuse_server()
462            .map_err(NydusError::CreateFuseServer)?;
463        let inflight_op = self.service.create_inflight_op();
464        let thread = thread::Builder::new()
465            .name("fuse_server".to_string())
466            .spawn(move || {
467                if let Err(_err) = s.svc_loop(&inflight_op) {
468                    // Notify the daemon controller that one working thread has exited.
469                    if let Err(err) = waker.wake() {
470                        error!("fail to exit daemon, error: {:?}", err);
471                    }
472                }
473                Ok(())
474            })
475            .map_err(NydusError::ThreadSpawn)?;
476
477        self.fuse_service_threads.lock().unwrap().push(thread);
478
479        Ok(())
480    }
481}
482
483impl DaemonStateMachineSubscriber for FusedevDaemon {
484    fn on_event(&self, event: DaemonStateMachineInput) -> NydusResult<()> {
485        self.request_sender
486            .lock()
487            .unwrap()
488            .send(event)
489            .map_err(NydusError::ChannelSend)?;
490
491        self.result_receiver
492            .lock()
493            .expect("Not expect poisoned lock!")
494            .recv()
495            .map_err(NydusError::ChannelReceive)?
496    }
497}
498
499impl NydusDaemon for FusedevDaemon {
500    fn as_any(&self) -> &dyn Any {
501        self
502    }
503
504    fn id(&self) -> Option<String> {
505        self.id.clone()
506    }
507
508    fn version(&self) -> BuildTimeInfo {
509        self.bti.clone()
510    }
511
512    fn get_state(&self) -> DaemonState {
513        self.state.load(Ordering::Relaxed).into()
514    }
515
516    fn set_state(&self, state: DaemonState) {
517        self.state.store(state as i32, Ordering::Relaxed);
518    }
519
520    fn start(&self) -> NydusResult<()> {
521        info!(
522            "start fuse servers with {} worker threads",
523            self.threads_cnt
524        );
525        for _ in 0..self.threads_cnt {
526            let waker = self.waker.clone();
527            self.kick_one_server(waker)
528                .map_err(|e| NydusError::StartService(format!("{}", e)))?;
529        }
530
531        Ok(())
532    }
533
534    fn umount(&self) -> NydusResult<()> {
535        self.service.umount()
536    }
537
538    fn stop(&self) {
539        let session = self
540            .service
541            .session
542            .lock()
543            .expect("Not expect poisoned lock.");
544        if let Err(e) = session.wake().map_err(NydusError::SessionShutdown) {
545            error!("failed to stop FUSE service thread: {:?}", e);
546        }
547    }
548
549    fn wait(&self) -> NydusResult<()> {
550        self.wait_state_machine()?;
551        self.wait_service()
552    }
553
554    fn wait_service(&self) -> NydusResult<()> {
555        loop {
556            let handle = self.fuse_service_threads.lock().unwrap().pop();
557            if let Some(handle) = handle {
558                handle
559                    .join()
560                    .map_err(|e| {
561                        let e = *e
562                            .downcast::<Error>()
563                            .unwrap_or_else(|e| Box::new(eother!(e)));
564                        NydusError::WaitDaemon(e)
565                    })?
566                    .map_err(NydusError::WaitDaemon)?;
567            } else {
568                // No more handles to wait
569                break;
570            }
571        }
572
573        Ok(())
574    }
575
576    fn wait_state_machine(&self) -> NydusResult<()> {
577        let mut guard = self.state_machine_thread.lock().unwrap();
578        if let Some(handler) = guard.take() {
579            let result = handler.join().map_err(|e| {
580                let e = *e
581                    .downcast::<Error>()
582                    .unwrap_or_else(|e| Box::new(eother!(e)));
583                NydusError::WaitDaemon(e)
584            })?;
585            result.map_err(NydusError::WaitDaemon)
586        } else {
587            Ok(())
588        }
589    }
590
591    fn supervisor(&self) -> Option<String> {
592        self.supervisor.clone()
593    }
594
595    fn save(&self) -> NydusResult<()> {
596        upgrade::fusedev_upgrade::save(self)
597    }
598
599    fn restore(&self) -> NydusResult<()> {
600        upgrade::fusedev_upgrade::restore(self)
601    }
602
603    fn get_default_fs_service(&self) -> Option<Arc<dyn FsService>> {
604        Some(self.service.clone())
605    }
606}
607
608#[cfg(target_os = "macos")]
609fn is_mounted(mp: impl AsRef<Path>) -> Result<bool> {
610    let mp = mp
611        .as_ref()
612        .to_str()
613        .ok_or_else(|| Error::from_raw_os_error(libc::EINVAL))?;
614    let mp = CString::new(String::from(mp)).map_err(|_| Error::from_raw_os_error(libc::EINVAL))?;
615    let mut mpb: Vec<libc::statfs> = Vec::new();
616    let mut mpb_ptr = mpb.as_mut_ptr();
617    let mpb_ptr = &mut mpb_ptr;
618
619    let mpb: Vec<libc::statfs> = unsafe {
620        let res = libc::getmntinfo(mpb_ptr, libc::MNT_NOWAIT);
621        if res < 0 {
622            return Err(Error::from_raw_os_error(res));
623        }
624        let size = res as usize;
625        Vec::from_raw_parts(*mpb_ptr, size, size)
626    };
627    let match_mp = mpb.iter().find(|mp_stat| unsafe {
628        let mp_name = CStr::from_ptr(&mp_stat.f_mntonname as *const i8);
629        let mp = CStr::from_ptr(mp.as_ptr());
630        mp.eq(mp_name)
631    });
632
633    Ok(match_mp.is_some())
634}
635
636// TODO: Perhaps, we can't rely on `/proc/self/mounts` to tell if it is mounted.
637#[cfg(target_os = "linux")]
638fn is_mounted(mp: impl AsRef<Path>) -> Result<bool> {
639    let mounts = CString::new("/proc/self/mounts").unwrap();
640    let ty = CString::new("r").unwrap();
641
642    let mounts_stream = unsafe {
643        libc::setmntent(
644            mounts.as_ptr() as *const libc::c_char,
645            ty.as_ptr() as *const libc::c_char,
646        )
647    };
648
649    loop {
650        let mnt = unsafe { libc::getmntent(mounts_stream) };
651        if mnt as u32 == libc::PT_NULL {
652            break;
653        }
654
655        // Mount point path
656        if unsafe { CStr::from_ptr((*mnt).mnt_dir) }
657            == CString::new(mp.as_ref().as_os_str().as_bytes())?.as_c_str()
658        {
659            unsafe { libc::endmntent(mounts_stream) };
660            return Ok(true);
661        }
662    }
663
664    unsafe { libc::endmntent(mounts_stream) };
665
666    Ok(false)
667}
668
669fn is_sock_residual(sock: impl AsRef<Path>) -> bool {
670    if metadata(&sock).is_ok() {
671        return UnixStream::connect(&sock).is_err();
672    }
673
674    false
675}
676
677/// When nydusd starts, it checks that whether a previous nydusd died unexpected by:
678///     1. Checking whether the mount point is residual by retrieving `/proc/self/mounts`.
679///     2. Checking whether the API socket exists and the connection can established or not.
680fn is_crashed(path: impl AsRef<Path>, sock: &impl AsRef<Path>) -> Result<bool> {
681    if is_mounted(path)? && is_sock_residual(sock) {
682        warn!("A previous daemon crashed! Try to failover later.");
683        return Ok(true);
684    }
685
686    Ok(false)
687}
688
689#[cfg(target_os = "macos")]
690fn calc_fuse_conn(mp: impl AsRef<Path>) -> Result<u64> {
691    let st = metadata(mp.as_ref()).inspect_err(|e| {
692        error!("Stat mountpoint {:?}, {}", mp.as_ref(), &e);
693    })?;
694    Ok(st.dev())
695}
696
697#[cfg(target_os = "linux")]
698fn calc_fuse_conn(mp: impl AsRef<Path>) -> Result<u64> {
699    let st = metadata(mp.as_ref()).inspect_err(|e| {
700        error!("Stat mountpoint {:?}, {}", mp.as_ref(), &e);
701    })?;
702    let dev = st.st_dev();
703    let (major, minor) = (major(dev), minor(dev));
704
705    // According to kernel formula:  MKDEV(ma,mi) (((ma) << 20) | (mi))
706    Ok(major << 20 | minor)
707}
708
709/// Create and start a [FusedevDaemon] instance.
710#[allow(clippy::too_many_arguments)]
711pub fn create_fuse_daemon(
712    mountpoint: &str,
713    vfs: Arc<Vfs>,
714    supervisor: Option<String>,
715    id: Option<String>,
716    threads_cnt: u32,
717    waker: Arc<Waker>,
718    api_sock: Option<impl AsRef<Path>>,
719    upgrade: bool,
720    readonly: bool,
721    fp: FailoverPolicy,
722    mount_cmd: Option<FsBackendMountCmd>,
723    bti: BuildTimeInfo,
724) -> Result<Arc<dyn NydusDaemon>> {
725    let mnt = Path::new(mountpoint).canonicalize()?;
726    let (trigger, events_rx) = channel::<DaemonStateMachineInput>();
727    let (result_sender, result_receiver) = channel::<NydusResult<()>>();
728    let daemon = FusedevDaemon::new(
729        trigger,
730        result_receiver,
731        vfs,
732        &mnt,
733        threads_cnt,
734        waker,
735        bti,
736        id,
737        supervisor,
738        readonly,
739        fp,
740    )?;
741    let daemon = Arc::new(daemon);
742    let machine = DaemonStateMachineContext::new(daemon.clone(), events_rx, result_sender);
743    let machine_thread = machine.kick_state_machine()?;
744    *daemon.state_machine_thread.lock().unwrap() = Some(machine_thread);
745
746    // Without api socket, nydusd can't do neither live-upgrade nor failover, so the helper
747    // finding a victim is not necessary.
748    if (api_sock.as_ref().is_some() && !upgrade && !is_crashed(&mnt, api_sock.as_ref().unwrap())?)
749        || api_sock.is_none()
750    {
751        if let Some(cmd) = mount_cmd {
752            daemon.service.mount(cmd).map_err(|e| {
753                error!("service mount error: {}", &e);
754                eother!(e)
755            })?;
756        }
757        daemon
758            .service
759            .session
760            .lock()
761            .unwrap()
762            .mount()
763            .map_err(|e| {
764                error!("service session mount error: {}", &e);
765                eother!(e)
766            })?;
767
768        daemon
769            .on_event(DaemonStateMachineInput::Mount)
770            .map_err(|e| eother!(e))?;
771        daemon
772            .on_event(DaemonStateMachineInput::Start)
773            .map_err(|e| eother!(e))?;
774        daemon
775            .service
776            .conn
777            .store(calc_fuse_conn(mnt)?, Ordering::Relaxed);
778
779        if let Some(f) = daemon.service.session.lock().unwrap().get_fuse_file() {
780            if let Some(mut m) = daemon.service.upgrade_mgr() {
781                m.hold_file(f).map_err(|e| {
782                    error!("Failed to hold fusedev fd, {:?}", e);
783                    eother!(e)
784                })?;
785                m.save_fuse_cid(daemon.service.conn.load(Ordering::Acquire));
786            }
787        }
788    }
789
790    Ok(daemon)
791}
792
793/// Create vfs backend with rafs or passthrough as the fuse filesystem driver
794#[cfg(target_os = "macos")]
795pub fn create_vfs_backend(
796    _fs_type: FsBackendType,
797    _is_fuse: bool,
798    _hybrid_mode: bool,
799) -> Result<Arc<Vfs>> {
800    let vfs = fuse_backend_rs::api::Vfs::new(fuse_backend_rs::api::VfsOptions::default());
801    Ok(Arc::new(vfs))
802}
803
804#[cfg(target_os = "linux")]
805pub fn create_vfs_backend(
806    fs_type: FsBackendType,
807    is_fuse: bool,
808    hybrid_mode: bool,
809) -> Result<Arc<Vfs>> {
810    let mut opts = fuse_backend_rs::api::VfsOptions::default();
811    match fs_type {
812        FsBackendType::PassthroughFs => {
813            // passthroughfs requires !no_open
814            opts.no_open = false;
815            opts.no_opendir = false;
816            opts.killpriv_v2 = true;
817        }
818        FsBackendType::Rafs => {
819            // rafs can be readonly and skip open
820            opts.no_open = true;
821        }
822    };
823
824    if !is_fuse && hybrid_mode {
825        opts.no_open = false;
826        opts.no_opendir = false;
827        opts.killpriv_v2 = true;
828    }
829
830    let vfs = fuse_backend_rs::api::Vfs::new(opts);
831    Ok(Arc::new(vfs))
832}