nydus_service/
upgrade.rs

1// Copyright 2021 Ant Group. All rights reserved.
2//
3// SPDX-License-Identifier: Apache-2.0
4
5//! Online upgrade manager for Nydus daemons and filesystems.
6
7use std::any::TypeId;
8use std::collections::HashMap;
9use std::convert::{TryFrom, TryInto};
10use std::fs::File;
11use std::io;
12use std::os::fd::{AsRawFd, FromRawFd};
13use std::path::PathBuf;
14
15use nydus_api::BlobCacheEntry;
16use nydus_upgrade::backend::unix_domain_socket::UdsStorageBackend;
17use nydus_upgrade::backend::{StorageBackend, StorageBackendErr};
18
19use crate::fs_service::{FsBackendMountCmd, FsBackendUmountCmd};
20use crate::{Error, Result};
21use fuse_backend_rs::api::Vfs;
22use versionize::{VersionMap, Versionize, VersionizeResult};
23use versionize_derive::Versionize;
24
25/// Error codes related to upgrade manager.
26#[derive(thiserror::Error, Debug)]
27pub enum UpgradeMgrError {
28    #[error("missing supervisor path")]
29    MissingSupervisorPath,
30
31    #[error("failed to save/restore data via the backend, {0}")]
32    StorageBackendError(StorageBackendErr),
33    #[error("failed to serialize, {0}")]
34    Serialize(io::Error),
35    #[error("failed to deserialize, {0}")]
36    Deserialize(io::Error),
37    #[error("failed to clone file, {0}")]
38    CloneFile(io::Error),
39    #[error("failed to initialize fscache driver, {0}")]
40    InitializeFscache(io::Error),
41}
42
43impl From<UpgradeMgrError> for Error {
44    fn from(e: UpgradeMgrError) -> Self {
45        Error::UpgradeManager(e)
46    }
47}
48
49/// FUSE fail-over policies.
50#[derive(PartialEq, Eq, Debug)]
51pub enum FailoverPolicy {
52    /// Do nothing.
53    None,
54    /// Flush pending requests.
55    Flush,
56    /// Resend pending requests.
57    Resend,
58}
59
60impl TryFrom<&str> for FailoverPolicy {
61    type Error = std::io::Error;
62
63    fn try_from(p: &str) -> std::result::Result<Self, Self::Error> {
64        match p {
65            "none" => Ok(FailoverPolicy::None),
66            "flush" => Ok(FailoverPolicy::Flush),
67            "resend" => Ok(FailoverPolicy::Resend),
68            x => Err(einval!(format!("invalid FUSE fail-over mode {}", x))),
69        }
70    }
71}
72
73impl TryFrom<&String> for FailoverPolicy {
74    type Error = std::io::Error;
75
76    fn try_from(p: &String) -> std::result::Result<Self, Self::Error> {
77        p.as_str().try_into()
78    }
79}
80
81struct FscacheState {
82    blob_entry_map: HashMap<String, BlobCacheEntry>,
83    threads: usize,
84    path: String,
85}
86
87#[derive(Versionize, Clone, Debug)]
88struct MountStateWrapper {
89    cmd: FsBackendMountCmd,
90    vfs_index: u8,
91}
92
93struct FusedevState {
94    fs_mount_cmd_map: HashMap<String, MountStateWrapper>,
95    vfs_state_data: Vec<u8>,
96    fuse_conn_id: u64,
97}
98
99/// Online upgrade manager.
100pub struct UpgradeManager {
101    fscache_deamon_stat: FscacheState,
102    fuse_deamon_stat: FusedevState,
103    file: Option<File>,
104    backend: Box<dyn StorageBackend>,
105}
106
107impl UpgradeManager {
108    /// Create a new instance of [UpgradeManager].
109    pub fn new(socket_path: PathBuf) -> Self {
110        UpgradeManager {
111            fscache_deamon_stat: FscacheState {
112                blob_entry_map: HashMap::new(),
113                threads: 1,
114                path: "".to_string(),
115            },
116            fuse_deamon_stat: FusedevState {
117                fs_mount_cmd_map: HashMap::new(),
118                vfs_state_data: vec![],
119                fuse_conn_id: 0,
120            },
121            file: None,
122            backend: Box::new(UdsStorageBackend::new(socket_path)),
123        }
124    }
125    pub fn add_blob_entry_state(&mut self, entry: BlobCacheEntry) {
126        let mut blob_state_id = entry.domain_id.to_string();
127        blob_state_id.push('/');
128        blob_state_id.push_str(&entry.blob_id);
129
130        self.fscache_deamon_stat
131            .blob_entry_map
132            .insert(blob_state_id, entry);
133    }
134
135    pub fn remove_blob_entry_state(&mut self, domain_id: &str, blob_id: &str) {
136        let mut blob_state_id = domain_id.to_string();
137        blob_state_id.push('/');
138        // for no shared domain mode, snapshotter will call unbind without blob_id
139        if !blob_id.is_empty() {
140            blob_state_id.push_str(blob_id);
141        } else {
142            blob_state_id.push_str(domain_id);
143        }
144
145        if self
146            .fscache_deamon_stat
147            .blob_entry_map
148            .remove(&blob_state_id)
149            .is_none()
150        {
151            warn!("blob {}: state was not saved before!", blob_state_id)
152        }
153    }
154
155    pub fn save_fscache_states(&mut self, threads: usize, path: String) {
156        self.fscache_deamon_stat.path = path;
157        self.fscache_deamon_stat.threads = threads;
158    }
159
160    pub fn save_fuse_cid(&mut self, fuse_conn_id: u64) {
161        self.fuse_deamon_stat.fuse_conn_id = fuse_conn_id;
162    }
163
164    pub fn save_vfs_stat(&mut self, vfs: &Vfs) -> Result<()> {
165        let vfs_state_data = vfs.save_to_bytes().map_err(|e| {
166            let io_err = io::Error::new(
167                io::ErrorKind::Other,
168                format!("Failed to save vfs state: {:?}", e),
169            );
170            UpgradeMgrError::Serialize(io_err)
171        })?;
172        self.fuse_deamon_stat.vfs_state_data = vfs_state_data;
173        Ok(())
174    }
175
176    /// Add a filesystem instance into the upgrade manager.
177    pub fn add_mounts_state(&mut self, cmd: FsBackendMountCmd, vfs_index: u8) {
178        let cmd_wrapper = MountStateWrapper {
179            cmd: cmd.clone(),
180            vfs_index,
181        };
182        self.fuse_deamon_stat
183            .fs_mount_cmd_map
184            .insert(cmd.mountpoint, cmd_wrapper);
185    }
186
187    /// Update a filesystem instance in the upgrade manager.
188    pub fn update_mounts_state(&mut self, cmd: FsBackendMountCmd) -> Result<()> {
189        match self
190            .fuse_deamon_stat
191            .fs_mount_cmd_map
192            .get_mut(&cmd.mountpoint)
193        {
194            Some(cmd_wrapper) => {
195                cmd_wrapper.cmd = cmd;
196                Ok(())
197            }
198            None => Err(Error::NotFound),
199        }
200    }
201
202    /// Remove a filesystem instance from the upgrade manager.
203    pub fn remove_mounts_state(&mut self, cmd: FsBackendUmountCmd) {
204        if self
205            .fuse_deamon_stat
206            .fs_mount_cmd_map
207            .remove(&cmd.mountpoint)
208            .is_none()
209        {
210            warn!(
211                "mount state for {}: state was not saved before!",
212                cmd.mountpoint
213            )
214        }
215    }
216
217    /// Save the fd and daemon state data for online upgrade.
218    fn save(&mut self, data: &[u8]) -> Result<()> {
219        let mut fds = Vec::new();
220        if let Some(ref f) = self.file {
221            fds.push(f.as_raw_fd())
222        }
223
224        self.backend
225            .save(&fds, data)
226            .map_err(UpgradeMgrError::StorageBackendError)?;
227        Ok(())
228    }
229
230    /// Restore the fd and daemon state data for online upgrade.
231    fn restore(&mut self) -> Result<Vec<u8>> {
232        let (fds, state_data) = self
233            .backend
234            .restore()
235            .map_err(UpgradeMgrError::StorageBackendError)?;
236        if fds.len() != 1 {
237            warn!("Too many fds {}, we may not correctly handle it", fds.len());
238        }
239        self.file = Some(unsafe { File::from_raw_fd(fds[0]) });
240        Ok(state_data)
241    }
242
243    pub fn hold_file(&mut self, fd: &File) -> Result<()> {
244        let f = fd.try_clone().map_err(UpgradeMgrError::CloneFile)?;
245        self.file = Some(f);
246
247        Ok(())
248    }
249
250    pub fn return_file(&mut self) -> Option<File> {
251        if let Some(ref f) = self.file {
252            // Basically, this can hardly fail.
253            f.try_clone()
254                .map_err(|e| {
255                    error!("Clone file error, {}", e);
256                    e
257                })
258                .ok()
259        } else {
260            warn!("No file can be returned");
261            None
262        }
263    }
264}
265#[cfg(target_os = "linux")]
266/// Online upgrade utilities for fscache daemon.
267pub mod fscache_upgrade {
268    use std::convert::TryFrom;
269    use std::str::FromStr;
270
271    use super::*;
272    use crate::daemon::NydusDaemon;
273    use crate::singleton::ServiceController;
274    use nydus_upgrade::persist::Snapshotter;
275    use versionize::{VersionMap, Versionize, VersionizeResult};
276    use versionize_derive::Versionize;
277
278    #[derive(Versionize, Clone, Debug)]
279    pub struct BlobCacheEntryState {
280        json_str: String,
281    }
282
283    #[derive(Versionize, Clone, Default, Debug)]
284    pub struct FscacheBackendState {
285        blob_entry_list: Vec<(String, BlobCacheEntryState)>,
286        threads: usize,
287        path: String,
288    }
289
290    impl Snapshotter for FscacheBackendState {
291        fn get_versions() -> Vec<HashMap<TypeId, u16>> {
292            vec![
293                // version 1
294                HashMap::from([(FscacheBackendState::type_id(), 1)]),
295                // more versions for the future
296            ]
297        }
298    }
299
300    impl TryFrom<&FscacheBackendState> for FscacheState {
301        type Error = std::io::Error;
302        fn try_from(backend_stat: &FscacheBackendState) -> std::result::Result<Self, Self::Error> {
303            let mut map = HashMap::new();
304            for (id, entry_stat) in &backend_stat.blob_entry_list {
305                let entry = BlobCacheEntry::from_str(&entry_stat.json_str)?;
306                map.insert(id.to_string(), entry);
307            }
308            Ok(FscacheState {
309                blob_entry_map: map,
310                threads: backend_stat.threads,
311                path: backend_stat.path.clone(),
312            })
313        }
314    }
315
316    impl TryFrom<&FscacheState> for FscacheBackendState {
317        type Error = std::io::Error;
318        fn try_from(stat: &FscacheState) -> std::result::Result<Self, Self::Error> {
319            let mut list = Vec::new();
320            for (id, entry) in &stat.blob_entry_map {
321                let entry_stat = serde_json::to_string(&entry)?;
322                list.push((
323                    id.to_string(),
324                    BlobCacheEntryState {
325                        json_str: entry_stat,
326                    },
327                ));
328            }
329            Ok(FscacheBackendState {
330                blob_entry_list: list,
331                threads: stat.threads,
332                path: stat.path.clone(),
333            })
334        }
335    }
336
337    pub fn save(daemon: &ServiceController) -> Result<()> {
338        if let Some(mut mgr) = daemon.upgrade_mgr() {
339            let backend_stat = FscacheBackendState::try_from(&mgr.fscache_deamon_stat)
340                .map_err(UpgradeMgrError::Serialize)?;
341            let stat = backend_stat.save().map_err(UpgradeMgrError::Serialize)?;
342            mgr.save(&stat)?;
343        }
344        Ok(())
345    }
346
347    pub fn restore(daemon: &ServiceController) -> Result<()> {
348        if let Some(mut mgr) = daemon.upgrade_mgr() {
349            if let Some(blob_mgr) = daemon.get_blob_cache_mgr() {
350                // restore the mgr state via the backend in the mgr
351                let mut state_data = mgr.restore()?;
352
353                let backend_stat = FscacheBackendState::restore(&mut state_data)
354                    .map_err(UpgradeMgrError::Deserialize)?;
355
356                let stat =
357                    FscacheState::try_from(&backend_stat).map_err(UpgradeMgrError::Deserialize)?;
358                // restore blob entry
359                stat.blob_entry_map
360                    .iter()
361                    .try_for_each(|(_, entry)| -> Result<()> {
362                        blob_mgr
363                            .add_blob_entry(entry)
364                            .map_err(UpgradeMgrError::Deserialize)?;
365                        Ok(())
366                    })?;
367
368                // init fscache daemon with restored fd
369                if let Some(f) = mgr.return_file() {
370                    daemon
371                        .initialize_fscache_service(None, stat.threads, &stat.path, Some(&f))
372                        .map_err(UpgradeMgrError::InitializeFscache)?;
373                }
374
375                //restore upgrade manager fscache stat
376                mgr.fscache_deamon_stat = stat;
377                return Ok(());
378            }
379        }
380        Err(UpgradeMgrError::MissingSupervisorPath.into())
381    }
382}
383
384/// Online upgrade utilities for FUSE daemon.
385pub mod fusedev_upgrade {
386    use std::sync::atomic::Ordering;
387
388    use super::*;
389    use crate::daemon::NydusDaemon;
390    use crate::fusedev::{FusedevDaemon, FusedevFsService};
391    use nydus_upgrade::persist::Snapshotter;
392    use versionize::{VersionMap, Versionize, VersionizeResult};
393    use versionize_derive::Versionize;
394
395    #[derive(Versionize, Clone, Default, Debug)]
396    pub struct FusedevBackendState {
397        fs_mount_cmd_list: Vec<(String, MountStateWrapper)>,
398        vfs_state_data: Vec<u8>,
399        fuse_conn_id: u64,
400    }
401
402    impl Snapshotter for FusedevBackendState {
403        fn get_versions() -> Vec<HashMap<TypeId, u16>> {
404            vec![
405                // version 1
406                HashMap::from([(FusedevBackendState::type_id(), 1)]),
407                // more versions for the future
408            ]
409        }
410    }
411
412    impl From<&FusedevBackendState> for FusedevState {
413        fn from(backend_stat: &FusedevBackendState) -> Self {
414            let mut map = HashMap::new();
415            for (mp, mw) in &backend_stat.fs_mount_cmd_list {
416                map.insert(mp.to_string(), mw.clone());
417            }
418            FusedevState {
419                fs_mount_cmd_map: map,
420                vfs_state_data: backend_stat.vfs_state_data.clone(),
421                fuse_conn_id: backend_stat.fuse_conn_id,
422            }
423        }
424    }
425
426    impl From<&FusedevState> for FusedevBackendState {
427        fn from(stat: &FusedevState) -> Self {
428            let mut list = Vec::new();
429            for (mp, mw) in &stat.fs_mount_cmd_map {
430                list.push((mp.to_string(), mw.clone()));
431            }
432            FusedevBackendState {
433                fs_mount_cmd_list: list,
434                vfs_state_data: stat.vfs_state_data.clone(),
435                fuse_conn_id: stat.fuse_conn_id,
436            }
437        }
438    }
439
440    /// Save state information for a FUSE daemon.
441    pub fn save(daemon: &FusedevDaemon) -> Result<()> {
442        let svc = daemon.get_default_fs_service().ok_or(Error::NotFound)?;
443        let vfs = svc.get_vfs();
444        if !vfs.initialized() {
445            return Err(Error::NotReady);
446        }
447
448        let mut mgr = svc.upgrade_mgr().unwrap();
449        mgr.save_vfs_stat(vfs)?;
450
451        let backend_stat = FusedevBackendState::from(&mgr.fuse_deamon_stat);
452
453        let state = backend_stat.save().map_err(UpgradeMgrError::Serialize)?;
454        mgr.save(&state)?;
455
456        Ok(())
457    }
458
459    /// Restore state information for a FUSE daemon.
460    pub fn restore(daemon: &FusedevDaemon) -> Result<()> {
461        if daemon.supervisor.is_none() {
462            return Err(UpgradeMgrError::MissingSupervisorPath.into());
463        }
464
465        let svc = daemon.get_default_fs_service().ok_or(Error::NotFound)?;
466
467        let mut mgr = svc.upgrade_mgr().unwrap();
468
469        // restore the mgr state via the backend in the mgr
470        let mut state_data = mgr.restore()?;
471
472        let backend_state =
473            FusedevBackendState::restore(&mut state_data).map_err(UpgradeMgrError::Deserialize)?;
474
475        let mut state = FusedevState::from(&backend_state);
476
477        // restore the fuse daemon
478        svc.as_any()
479            .downcast_ref::<FusedevFsService>()
480            .unwrap()
481            .conn
482            .store(state.fuse_conn_id, Ordering::Release);
483
484        // restore fuse fd
485        if let Some(f) = mgr.return_file() {
486            let fuse_svc = svc.as_any().downcast_ref::<FusedevFsService>().unwrap();
487            fuse_svc.session.lock().unwrap().set_fuse_file(f);
488
489            // drain fuse requests
490            if let Err(e) = fuse_svc.drain_fuse_requests() {
491                warn!("Failed to drain fuse requests: {}", e);
492            }
493        }
494
495        // restore vfs
496        svc.get_vfs()
497            .restore_from_bytes(&mut state.vfs_state_data)?;
498        state
499            .fs_mount_cmd_map
500            .iter()
501            .try_for_each(|(_, mount_wrapper)| -> Result<()> {
502                svc.restore_mount(&mount_wrapper.cmd, mount_wrapper.vfs_index)?;
503                // as we are in upgrade stage and obtain the lock, `unwrap` is safe here
504                //mgr.add_mounts_state(cmd.clone(), *vfs_idx);
505                Ok(())
506            })?;
507
508        //restore upgrade manager fuse stat
509        mgr.fuse_deamon_stat = state;
510
511        Ok(())
512    }
513}
514
515#[cfg(test)]
516mod tests {
517    use super::*;
518    use crate::fs_service::{FsBackendMountCmd, FsBackendUmountCmd};
519    #[cfg(target_os = "linux")]
520    use crate::upgrade::fscache_upgrade::FscacheBackendState;
521    use crate::upgrade::fusedev_upgrade::FusedevBackendState;
522    use crate::FsBackendType;
523    use nydus_upgrade::persist::Snapshotter;
524    use vmm_sys_util::tempfile::TempFile;
525
526    #[test]
527    fn test_failover_policy() {
528        assert_eq!(
529            FailoverPolicy::try_from("none").unwrap(),
530            FailoverPolicy::None
531        );
532        assert_eq!(
533            FailoverPolicy::try_from("flush").unwrap(),
534            FailoverPolicy::Flush
535        );
536        assert_eq!(
537            FailoverPolicy::try_from("resend").unwrap(),
538            FailoverPolicy::Resend
539        );
540
541        let strs = vec!["null", "flash", "Resend"];
542        for s in strs.clone().into_iter() {
543            assert!(FailoverPolicy::try_from(s).is_err());
544        }
545
546        let str = String::from("none");
547        assert_eq!(
548            FailoverPolicy::try_from(&str).unwrap(),
549            FailoverPolicy::None
550        );
551        let str = String::from("flush");
552        assert_eq!(
553            FailoverPolicy::try_from(&str).unwrap(),
554            FailoverPolicy::Flush
555        );
556        let str = String::from("resend");
557        assert_eq!(
558            FailoverPolicy::try_from(&str).unwrap(),
559            FailoverPolicy::Resend
560        );
561
562        let strings: Vec<String> = strs.into_iter().map(|s| s.to_owned()).collect();
563        for s in strings.iter() {
564            assert!(FailoverPolicy::try_from(s).is_err());
565        }
566    }
567
568    #[test]
569    #[cfg(target_os = "linux")]
570    fn test_upgrade_manager_for_fscache() {
571        let mut upgrade_mgr = UpgradeManager::new("dummy_socket".into());
572
573        let content = r#"{
574            "type": "bootstrap",
575            "id": "blob1",
576            "config": {
577                "id": "cache1",
578                "backend_type": "localfs",
579                "backend_config": {},
580                "cache_type": "fscache",
581                "cache_config": {},
582                "metadata_path": "/tmp/metadata1"
583            },
584            "domain_id": "domain1"
585        }"#;
586        let entry: BlobCacheEntry = serde_json::from_str(content).unwrap();
587        upgrade_mgr.save_fscache_states(4, "/tmp/fscache_dir".to_string());
588        assert_eq!(upgrade_mgr.fscache_deamon_stat.threads, 4);
589        assert_eq!(upgrade_mgr.fscache_deamon_stat.path, "/tmp/fscache_dir");
590
591        upgrade_mgr.add_blob_entry_state(entry);
592        assert!(upgrade_mgr
593            .fscache_deamon_stat
594            .blob_entry_map
595            .contains_key("domain1/blob1"));
596
597        assert!(FscacheBackendState::try_from(&upgrade_mgr.fscache_deamon_stat).is_ok());
598
599        let backend_stat = FscacheBackendState::try_from(&upgrade_mgr.fscache_deamon_stat).unwrap();
600        assert!(backend_stat.save().is_ok());
601        assert!(FscacheState::try_from(&backend_stat).is_ok());
602        let stat = FscacheState::try_from(&backend_stat).unwrap();
603        assert_eq!(stat.path, upgrade_mgr.fscache_deamon_stat.path);
604        assert_eq!(stat.threads, upgrade_mgr.fscache_deamon_stat.threads);
605        assert!(stat.blob_entry_map.contains_key("domain1/blob1"));
606
607        upgrade_mgr.remove_blob_entry_state("domain1", "blob1");
608        assert!(!upgrade_mgr
609            .fscache_deamon_stat
610            .blob_entry_map
611            .contains_key("domain1/blob1"));
612    }
613
614    #[test]
615    fn test_upgrade_manager_for_fusedev() {
616        let mut upgrade_mgr = UpgradeManager::new("dummy_socket".into());
617
618        let config = r#"{
619            "version": 2,
620            "id": "factory1",
621            "backend": {
622                "type": "localfs",
623                "localfs": {
624                    "dir": "/tmp/nydus"
625                }
626            },
627            "cache": {
628                "type": "fscache",
629                "fscache": {
630                    "work_dir": "/tmp/nydus"
631                }
632            },
633            "metadata_path": "/tmp/nydus/bootstrap1"
634        }"#;
635        let cmd = FsBackendMountCmd {
636            fs_type: FsBackendType::Rafs,
637            config: config.to_string(),
638            mountpoint: "testmonutount".to_string(),
639            source: "testsource".to_string(),
640            prefetch_files: Some(vec!["testfile".to_string()]),
641        };
642
643        upgrade_mgr.save_fuse_cid(10);
644        assert_eq!(upgrade_mgr.fuse_deamon_stat.fuse_conn_id, 10);
645        upgrade_mgr.add_mounts_state(cmd.clone(), 5);
646        assert!(upgrade_mgr
647            .fuse_deamon_stat
648            .fs_mount_cmd_map
649            .contains_key("testmonutount"));
650        assert!(upgrade_mgr.update_mounts_state(cmd).is_ok());
651
652        let backend_stat = FusedevBackendState::from(&upgrade_mgr.fuse_deamon_stat);
653        assert!(backend_stat.save().is_ok());
654
655        let stat = FusedevState::from(&backend_stat);
656        assert_eq!(stat.fuse_conn_id, upgrade_mgr.fuse_deamon_stat.fuse_conn_id);
657        assert!(stat.fs_mount_cmd_map.contains_key("testmonutount"));
658
659        let umount_cmd: FsBackendUmountCmd = FsBackendUmountCmd {
660            mountpoint: "testmonutount".to_string(),
661        };
662        upgrade_mgr.remove_mounts_state(umount_cmd);
663        assert!(!upgrade_mgr
664            .fuse_deamon_stat
665            .fs_mount_cmd_map
666            .contains_key("testmonutount"));
667    }
668
669    #[test]
670    fn test_upgrade_manager_hold_fd() {
671        let mut upgrade_mgr = UpgradeManager::new("dummy_socket".into());
672
673        let temp = TempFile::new().unwrap().into_file();
674        assert!(upgrade_mgr.hold_file(&temp).is_ok());
675        assert!(upgrade_mgr.return_file().is_some());
676    }
677}