nydus_service/
fs_service.rs

1// Copyright (C) 2020-2022 Alibaba Cloud. All rights reserved.
2// Copyright 2020 Ant Group. All rights reserved.
3// Copyright 2019 Intel Corporation. All Rights Reserved.
4//
5// SPDX-License-Identifier: (Apache-2.0 AND BSD-3-Clause)
6
7//! Infrastructure to define and implement filesystem services.
8
9use std::any::Any;
10use std::collections::HashMap;
11use std::ops::Deref;
12use std::path::{Path, PathBuf};
13use std::str::FromStr;
14use std::sync::{Arc, MutexGuard};
15
16use fuse_backend_rs::abi::fuse_abi::ROOT_ID;
17#[cfg(target_os = "linux")]
18use fuse_backend_rs::api::filesystem::{FileSystem, FsOptions, Layer};
19use fuse_backend_rs::api::vfs::VfsError;
20use fuse_backend_rs::api::{BackFileSystem, Vfs};
21#[cfg(target_os = "linux")]
22use fuse_backend_rs::overlayfs::{config::Config as overlay_config, OverlayFs};
23#[cfg(target_os = "linux")]
24use fuse_backend_rs::passthrough::{CachePolicy, Config as passthrough_config, PassthroughFs};
25use nydus_api::ConfigV2;
26use nydus_rafs::fs::Rafs;
27use nydus_rafs::metadata::RafsInode;
28use nydus_rafs::{RafsError, RafsIoRead};
29use nydus_storage::factory::BLOB_FACTORY;
30use serde::{Deserialize, Serialize};
31use versionize::{VersionMap, Versionize, VersionizeResult};
32use versionize_derive::Versionize;
33
34use crate::upgrade::UpgradeManager;
35use crate::{Error, FsBackendDescriptor, FsBackendType, Result};
36
37/// Request structure to mount a filesystem instance.
38#[derive(Clone, Versionize, Debug)]
39pub struct FsBackendMountCmd {
40    /// Filesystem type.
41    pub fs_type: FsBackendType,
42    /// Mount source.
43    pub source: String,
44    /// Configuration information for the mount operation.
45    pub config: String,
46    /// Filesystem mountpoint.
47    pub mountpoint: String,
48    /// Optional prefetch file list.
49    pub prefetch_files: Option<Vec<String>>,
50}
51
52/// Request structure to unmount a filesystem instance.
53#[derive(Clone, Deserialize, Serialize, Debug)]
54pub struct FsBackendUmountCmd {
55    /// Filesystem mountpoint.
56    pub mountpoint: String,
57}
58
59/// List of [FsBackendDescriptor], providing filesystem metrics and statistics information.
60#[derive(Default, Serialize, Clone)]
61pub struct FsBackendCollection(HashMap<String, FsBackendDescriptor>);
62
63impl FsBackendCollection {
64    fn add(&mut self, id: &str, cmd: &FsBackendMountCmd) -> Result<()> {
65        // We only wash Rafs backend now.
66        let fs_config = match cmd.fs_type {
67            FsBackendType::Rafs => {
68                let cfg = ConfigV2::from_str(&cmd.config)
69                    .map_err(|e| Error::InvalidConfig(format!("{}", e)))?;
70                let cfg = cfg.clone_without_secrets();
71                Some(cfg)
72            }
73            FsBackendType::PassthroughFs => {
74                // Passthrough Fs has no configuration information.
75                None
76            }
77        };
78
79        let desc = FsBackendDescriptor {
80            backend_type: cmd.fs_type.clone(),
81            mountpoint: cmd.mountpoint.clone(),
82            mounted_time: time::OffsetDateTime::now_utc(),
83            config: fs_config,
84        };
85
86        self.0.insert(id.to_string(), desc);
87
88        Ok(())
89    }
90
91    pub fn del(&mut self, id: &str) {
92        self.0.remove(id);
93    }
94}
95
96/// Abstract interfaces for filesystem service provider.
97pub trait FsService: Send + Sync {
98    /// Get the [Vfs](https://docs.rs/fuse-backend-rs/latest/fuse_backend_rs/api/vfs/struct.Vfs.html)
99    /// object associated with the filesystem service object.
100    fn get_vfs(&self) -> &Vfs;
101
102    /// Get the [BackFileSystem](https://docs.rs/fuse-backend-rs/latest/fuse_backend_rs/api/vfs/type.BackFileSystem.html)
103    /// object associated with a mount point.
104    fn backend_from_mountpoint(&self, mp: &str) -> Result<Option<(Arc<BackFileSystem>, u8)>> {
105        self.get_vfs().get_rootfs(mp).map_err(|e| e.into())
106    }
107
108    /// Get handle to the optional upgrade manager.
109    fn upgrade_mgr(&self) -> Option<MutexGuard<'_, UpgradeManager>>;
110
111    /// Mount a new filesystem instance.
112    // NOTE: This method is not thread-safe, however, it is acceptable as
113    // mount/umount/remount/restore_mount is invoked from single thread in FSM
114    fn mount(&self, cmd: FsBackendMountCmd) -> Result<()> {
115        if self.backend_from_mountpoint(&cmd.mountpoint)?.is_some() {
116            return Err(Error::AlreadyExists);
117        }
118        let backend = fs_backend_factory(&cmd)?;
119        let index = self.get_vfs().mount(backend, &cmd.mountpoint)?;
120        info!("{} filesystem mounted at {}", &cmd.fs_type, &cmd.mountpoint);
121
122        if let Err(e) = self.backend_collection().add(&cmd.mountpoint, &cmd) {
123            warn!(
124                "failed to add filesystem instance to metrics manager, {}",
125                e
126            );
127        }
128        if let Some(mut mgr_guard) = self.upgrade_mgr() {
129            mgr_guard.add_mounts_state(cmd, index);
130            mgr_guard.save_vfs_stat(self.get_vfs())?;
131        }
132
133        Ok(())
134    }
135
136    /// Remount a filesystem instance.
137    fn remount(&self, cmd: FsBackendMountCmd) -> Result<()> {
138        let (rootfs, _) = self
139            .backend_from_mountpoint(&cmd.mountpoint)?
140            .ok_or(Error::NotFound)?;
141        let mut bootstrap = <dyn RafsIoRead>::from_file(&cmd.source)?;
142        let any_fs = rootfs.deref().as_any();
143        let rafs = any_fs
144            .downcast_ref::<Rafs>()
145            .ok_or_else(|| Error::FsTypeMismatch("RAFS".to_string()))?;
146        let rafs_cfg = ConfigV2::from_str(&cmd.config).map_err(RafsError::LoadConfig)?;
147        let rafs_cfg = Arc::new(rafs_cfg);
148
149        rafs.update(&mut bootstrap, &rafs_cfg)
150            .map_err(|e| match e {
151                RafsError::Unsupported => Error::Unsupported,
152                e => Error::Rafs(e),
153            })?;
154
155        // To update mounted time and backend configurations.
156        if let Err(e) = self.backend_collection().add(&cmd.mountpoint, &cmd) {
157            warn!(
158                "failed to update filesystem instance to metrics manager, {}",
159                e
160            );
161        }
162        // Update mounts opaque from UpgradeManager
163        if let Some(mut mgr_guard) = self.upgrade_mgr() {
164            mgr_guard.update_mounts_state(cmd)?;
165        }
166
167        Ok(())
168    }
169
170    /// Restore a filesystem instance.
171    fn restore_mount(&self, cmd: &FsBackendMountCmd, vfs_index: u8) -> Result<()> {
172        let backend = fs_backend_factory(cmd)?;
173        self.get_vfs()
174            .restore_mount(backend, vfs_index, &cmd.mountpoint)
175            .map_err(VfsError::RestoreMount)?;
176        self.backend_collection().add(&cmd.mountpoint, &cmd)?;
177        info!("backend fs restored at {}", cmd.mountpoint);
178        Ok(())
179    }
180
181    /// Umount a filesystem instance.
182    fn umount(&self, cmd: FsBackendUmountCmd) -> Result<()> {
183        let (fs, fs_idx) = self
184            .backend_from_mountpoint(&cmd.mountpoint)?
185            .ok_or(Error::NotFound)?;
186
187        if self.is_fuse() {
188            if let Some(rafs) = fs.deref().as_any().downcast_ref::<Rafs>() {
189                let root_ino = rafs.get_root_inode().unwrap();
190                self.walk_and_notify_invalidation(
191                    ROOT_ID,
192                    cmd.mountpoint.trim_start_matches('/'),
193                    root_ino,
194                    fs_idx,
195                )?;
196            }
197        }
198
199        drop(fs);
200
201        self.get_vfs().umount(&cmd.mountpoint)?;
202        self.backend_collection().del(&cmd.mountpoint);
203        if let Some(mut mgr_guard) = self.upgrade_mgr() {
204            // Remove mount opaque from UpgradeManager
205            mgr_guard.remove_mounts_state(cmd);
206            mgr_guard.save_vfs_stat(self.get_vfs())?;
207        }
208
209        debug!("try to gc unused blobs");
210        BLOB_FACTORY.gc(None);
211
212        Ok(())
213    }
214
215    /// Get list of metrics information objects about mounted filesystem instances.
216    fn backend_collection(&self) -> MutexGuard<'_, FsBackendCollection>;
217
218    /// Export information about the filesystem service.
219    fn export_backend_info(&self, mountpoint: &str) -> Result<String> {
220        let (fs, _) = self
221            .backend_from_mountpoint(mountpoint)?
222            .ok_or(Error::NotFound)?;
223        let any_fs = fs.deref().as_any();
224        let rafs = any_fs
225            .downcast_ref::<Rafs>()
226            .ok_or_else(|| Error::FsTypeMismatch("RAFS".to_string()))?;
227        let resp = serde_json::to_string(rafs.metadata()).map_err(Error::Serde)?;
228        Ok(resp)
229    }
230
231    /// Export metrics about in-flight operations.
232    fn export_inflight_ops(&self) -> Result<Option<String>>;
233
234    /// Recursively walk the inode tree and send cache invalidation notifications.
235    fn walk_and_notify_invalidation(
236        &self,
237        _parent_kernel_ino: u64,
238        _cur_name: &str,
239        _cur_inode: Arc<dyn RafsInode>,
240        _fs_idx: u8,
241    ) -> Result<()> {
242        Ok(())
243    }
244
245    /// Check whether the filesystem service is a FUSE service.
246    fn is_fuse(&self) -> bool {
247        false
248    }
249    /// Cast `self` to trait object of [Any] to support object downcast.
250    fn as_any(&self) -> &dyn Any;
251}
252
253/// Validate prefetch file list from user input.
254///
255/// Validation rules:
256/// - an item may be file or directory.
257/// - items must be separated by space, such as "<path1> <path2> <path3>".
258/// - each item must be absolute path, such as "/foo1/bar1 /foo2/bar2".
259fn validate_prefetch_file_list(input: &Option<Vec<String>>) -> Result<Option<Vec<PathBuf>>> {
260    if let Some(list) = input {
261        let list: Vec<PathBuf> = list.iter().map(PathBuf::from).collect();
262        for elem in list.iter() {
263            if !elem.is_absolute() {
264                return Err(Error::InvalidPrefetchList);
265            }
266        }
267        Ok(Some(list))
268    } else {
269        Ok(None)
270    }
271}
272
273fn fs_backend_factory(cmd: &FsBackendMountCmd) -> Result<BackFileSystem> {
274    let prefetch_files = validate_prefetch_file_list(&cmd.prefetch_files)?;
275
276    match cmd.fs_type {
277        FsBackendType::Rafs => {
278            let config = ConfigV2::from_str(cmd.config.as_str()).map_err(RafsError::LoadConfig)?;
279            let config = Arc::new(config);
280            let (mut rafs, reader) = Rafs::new(&config, &cmd.mountpoint, Path::new(&cmd.source))?;
281            rafs.import(reader, prefetch_files)?;
282
283            // Put a writable upper layer above the rafs to create an OverlayFS with two layers.
284            match &config.overlay {
285                Some(ovl_conf) => {
286                    // check workdir and upperdir params.
287                    if ovl_conf.work_dir.is_empty() || ovl_conf.upper_dir.is_empty() {
288                        return Err(Error::InvalidArguments(String::from(
289                            "workdir and upperdir must be specified for overlayfs",
290                        )));
291                    }
292
293                    // Create an overlay upper layer with passthroughfs.
294                    #[cfg(target_os = "macos")]
295                    return Err(Error::InvalidArguments(String::from(
296                        "not support OverlayFs since passthroughfs isn't supported on MacOS",
297                    )));
298                    #[cfg(target_os = "linux")]
299                    {
300                        let fs_cfg = passthrough_config {
301                            // Use upper_dir as root_dir as rw layer.
302                            root_dir: ovl_conf.upper_dir.clone(),
303                            do_import: true,
304                            writeback: true,
305                            no_open: true,
306                            no_opendir: true,
307                            xattr: true,
308                            cache_policy: CachePolicy::Always,
309                            ..Default::default()
310                        };
311                        let fsopts = FsOptions::WRITEBACK_CACHE
312                            | FsOptions::ZERO_MESSAGE_OPEN
313                            | FsOptions::ZERO_MESSAGE_OPENDIR;
314
315                        let passthrough_fs = PassthroughFs::<()>::new(fs_cfg)
316                            .map_err(|e| Error::InvalidConfig(format!("{}", e)))?;
317                        passthrough_fs.init(fsopts).map_err(Error::PassthroughFs)?;
318
319                        type BoxedLayer = Box<dyn Layer<Inode = u64, Handle = u64> + Send + Sync>;
320                        let upper_layer = Arc::new(Box::new(passthrough_fs) as BoxedLayer);
321
322                        // Create overlay lower layer with rafs, use lower_dir as root_dir of rafs.
323                        let lower_layers = vec![Arc::new(Box::new(rafs) as BoxedLayer)];
324
325                        let overlay_config = overlay_config {
326                            work: ovl_conf.work_dir.clone(),
327                            mountpoint: cmd.mountpoint.clone(),
328                            do_import: false,
329                            no_open: true,
330                            no_opendir: true,
331                            ..Default::default()
332                        };
333                        let overlayfs =
334                            OverlayFs::new(Some(upper_layer), lower_layers, overlay_config)
335                                .map_err(|e| Error::InvalidConfig(format!("{}", e)))?;
336                        info!(
337                            "init overlay fs inode, upper {}, work {}\n",
338                            ovl_conf.upper_dir.clone(),
339                            ovl_conf.work_dir.clone()
340                        );
341                        overlayfs
342                            .import()
343                            .map_err(|e| Error::InvalidConfig(format!("{}", e)))?;
344                        info!("Overlay filesystem imported");
345                        Ok(Box::new(overlayfs))
346                    }
347                }
348                None => {
349                    info!("RAFS filesystem imported");
350                    Ok(Box::new(rafs))
351                }
352            }
353        }
354        FsBackendType::PassthroughFs => {
355            #[cfg(target_os = "macos")]
356            return Err(Error::InvalidArguments(String::from(
357                "not support passthroughfs",
358            )));
359            #[cfg(target_os = "linux")]
360            {
361                // Vfs by default enables no_open and writeback, passthroughfs
362                // needs to specify them explicitly.
363                // TODO(liubo): enable no_open_dir.
364                let fs_cfg = passthrough_config {
365                    root_dir: cmd.source.to_string(),
366                    do_import: false,
367                    writeback: true,
368                    no_open: true,
369                    xattr: true,
370                    ..Default::default()
371                };
372                let passthrough_fs =
373                    PassthroughFs::<()>::new(fs_cfg).map_err(Error::PassthroughFs)?;
374                passthrough_fs.import().map_err(Error::PassthroughFs)?;
375                info!("PassthroughFs imported");
376                Ok(Box::new(passthrough_fs))
377            }
378        }
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use super::*;
385
386    #[test]
387    fn it_should_add_new_backend() {
388        let mut col: FsBackendCollection = Default::default();
389        let config = r#"{
390                "version": 2,
391                "id": "factory1",
392                "backend": {
393                    "type": "localfs",
394                    "localfs": {
395                        "dir": "/tmp/nydus"
396                    }
397                },
398                "cache": {
399                    "type": "fscache",
400                    "fscache": {
401                        "work_dir": "/tmp/nydus"
402                    }
403                },
404                "metadata_path": "/tmp/nydus/bootstrap1"
405            }"#;
406        let r = col.add(
407            "test",
408            &FsBackendMountCmd {
409                fs_type: FsBackendType::Rafs,
410                config: config.to_string(),
411                mountpoint: "testmonutount".to_string(),
412                source: "testsource".to_string(),
413                prefetch_files: Some(vec!["testfile".to_string()]),
414            },
415        );
416        assert!(r.is_ok(), "failed to add backend collection");
417
418        assert_eq!(col.0.len(), 1);
419
420        col.del("test");
421        assert_eq!(col.0.len(), 0);
422    }
423
424    #[test]
425    fn it_should_verify_prefetch_files() {
426        let files = validate_prefetch_file_list(&Some(vec!["/etc/passwd".to_string()]));
427        assert!(files.is_ok(), "failed to verify prefetch files");
428        assert_eq!(1, files.unwrap().unwrap().len());
429
430        assert!(
431            validate_prefetch_file_list(&Some(vec!["etc/passwd".to_string()])).is_err(),
432            "should not pass verify"
433        );
434    }
435
436    #[test]
437    fn it_should_create_rafs_backend() {
438        let config = r#"
439        {
440            "device": {
441              "backend": {
442                "type": "oss",
443                "config": {
444                  "endpoint": "test",
445                  "access_key_id": "test",
446                  "access_key_secret": "test",
447                  "bucket_name": "antsys-nydus",
448                  "object_prefix":"nydus_v2/",
449                  "scheme": "http"
450                }
451              }
452            },
453            "mode": "direct",
454            "digest_validate": false,
455            "enable_xattr": true,
456            "fs_prefetch": {
457              "enable": true,
458              "threads_count": 10,
459              "merging_size": 131072,
460              "bandwidth_rate": 10485760
461            }
462          }"#;
463        let bootstrap = "../tests/texture/bootstrap/nydusd_daemon_test_bootstrap";
464        if fs_backend_factory(&FsBackendMountCmd {
465            fs_type: FsBackendType::Rafs,
466            config: config.to_string(),
467            mountpoint: "testmountpoint".to_string(),
468            source: bootstrap.to_string(),
469            prefetch_files: Some(vec!["/testfile".to_string()]),
470        })
471        .unwrap()
472        .as_any()
473        .downcast_ref::<Rafs>()
474        .is_none()
475        {
476            panic!("failed to create rafs backend")
477        }
478    }
479}