1use std::any::TypeId;
8use std::collections::HashMap;
9use std::convert::{TryFrom, TryInto};
10use std::fs::File;
11use std::io;
12use std::os::fd::{AsRawFd, FromRawFd};
13use std::path::PathBuf;
14
15use nydus_api::BlobCacheEntry;
16use nydus_upgrade::backend::unix_domain_socket::UdsStorageBackend;
17use nydus_upgrade::backend::{StorageBackend, StorageBackendErr};
18
19use crate::fs_service::{FsBackendMountCmd, FsBackendUmountCmd};
20use crate::{Error, Result};
21use fuse_backend_rs::api::Vfs;
22use versionize::{VersionMap, Versionize, VersionizeResult};
23use versionize_derive::Versionize;
24
25#[derive(thiserror::Error, Debug)]
27pub enum UpgradeMgrError {
28 #[error("missing supervisor path")]
29 MissingSupervisorPath,
30
31 #[error("failed to save/restore data via the backend, {0}")]
32 StorageBackendError(StorageBackendErr),
33 #[error("failed to serialize, {0}")]
34 Serialize(io::Error),
35 #[error("failed to deserialize, {0}")]
36 Deserialize(io::Error),
37 #[error("failed to clone file, {0}")]
38 CloneFile(io::Error),
39 #[error("failed to initialize fscache driver, {0}")]
40 InitializeFscache(io::Error),
41}
42
43impl From<UpgradeMgrError> for Error {
44 fn from(e: UpgradeMgrError) -> Self {
45 Error::UpgradeManager(e)
46 }
47}
48
49#[derive(PartialEq, Eq, Debug)]
51pub enum FailoverPolicy {
52 None,
54 Flush,
56 Resend,
58}
59
60impl TryFrom<&str> for FailoverPolicy {
61 type Error = std::io::Error;
62
63 fn try_from(p: &str) -> std::result::Result<Self, Self::Error> {
64 match p {
65 "none" => Ok(FailoverPolicy::None),
66 "flush" => Ok(FailoverPolicy::Flush),
67 "resend" => Ok(FailoverPolicy::Resend),
68 x => Err(einval!(format!("invalid FUSE fail-over mode {}", x))),
69 }
70 }
71}
72
73impl TryFrom<&String> for FailoverPolicy {
74 type Error = std::io::Error;
75
76 fn try_from(p: &String) -> std::result::Result<Self, Self::Error> {
77 p.as_str().try_into()
78 }
79}
80
81struct FscacheState {
82 blob_entry_map: HashMap<String, BlobCacheEntry>,
83 threads: usize,
84 path: String,
85}
86
87#[derive(Versionize, Clone, Debug)]
88struct MountStateWrapper {
89 cmd: FsBackendMountCmd,
90 vfs_index: u8,
91}
92
93struct FusedevState {
94 fs_mount_cmd_map: HashMap<String, MountStateWrapper>,
95 vfs_state_data: Vec<u8>,
96 fuse_conn_id: u64,
97}
98
99pub struct UpgradeManager {
101 fscache_deamon_stat: FscacheState,
102 fuse_deamon_stat: FusedevState,
103 file: Option<File>,
104 backend: Box<dyn StorageBackend>,
105}
106
107impl UpgradeManager {
108 pub fn new(socket_path: PathBuf) -> Self {
110 UpgradeManager {
111 fscache_deamon_stat: FscacheState {
112 blob_entry_map: HashMap::new(),
113 threads: 1,
114 path: "".to_string(),
115 },
116 fuse_deamon_stat: FusedevState {
117 fs_mount_cmd_map: HashMap::new(),
118 vfs_state_data: vec![],
119 fuse_conn_id: 0,
120 },
121 file: None,
122 backend: Box::new(UdsStorageBackend::new(socket_path)),
123 }
124 }
125 pub fn add_blob_entry_state(&mut self, entry: BlobCacheEntry) {
126 let mut blob_state_id = entry.domain_id.to_string();
127 blob_state_id.push('/');
128 blob_state_id.push_str(&entry.blob_id);
129
130 self.fscache_deamon_stat
131 .blob_entry_map
132 .insert(blob_state_id, entry);
133 }
134
135 pub fn remove_blob_entry_state(&mut self, domain_id: &str, blob_id: &str) {
136 let mut blob_state_id = domain_id.to_string();
137 blob_state_id.push('/');
138 if !blob_id.is_empty() {
140 blob_state_id.push_str(blob_id);
141 } else {
142 blob_state_id.push_str(domain_id);
143 }
144
145 if self
146 .fscache_deamon_stat
147 .blob_entry_map
148 .remove(&blob_state_id)
149 .is_none()
150 {
151 warn!("blob {}: state was not saved before!", blob_state_id)
152 }
153 }
154
155 pub fn save_fscache_states(&mut self, threads: usize, path: String) {
156 self.fscache_deamon_stat.path = path;
157 self.fscache_deamon_stat.threads = threads;
158 }
159
160 pub fn save_fuse_cid(&mut self, fuse_conn_id: u64) {
161 self.fuse_deamon_stat.fuse_conn_id = fuse_conn_id;
162 }
163
164 pub fn save_vfs_stat(&mut self, vfs: &Vfs) -> Result<()> {
165 let vfs_state_data = vfs.save_to_bytes().map_err(|e| {
166 let io_err = io::Error::new(
167 io::ErrorKind::Other,
168 format!("Failed to save vfs state: {:?}", e),
169 );
170 UpgradeMgrError::Serialize(io_err)
171 })?;
172 self.fuse_deamon_stat.vfs_state_data = vfs_state_data;
173 Ok(())
174 }
175
176 pub fn add_mounts_state(&mut self, cmd: FsBackendMountCmd, vfs_index: u8) {
178 let cmd_wrapper = MountStateWrapper {
179 cmd: cmd.clone(),
180 vfs_index,
181 };
182 self.fuse_deamon_stat
183 .fs_mount_cmd_map
184 .insert(cmd.mountpoint, cmd_wrapper);
185 }
186
187 pub fn update_mounts_state(&mut self, cmd: FsBackendMountCmd) -> Result<()> {
189 match self
190 .fuse_deamon_stat
191 .fs_mount_cmd_map
192 .get_mut(&cmd.mountpoint)
193 {
194 Some(cmd_wrapper) => {
195 cmd_wrapper.cmd = cmd;
196 Ok(())
197 }
198 None => Err(Error::NotFound),
199 }
200 }
201
202 pub fn remove_mounts_state(&mut self, cmd: FsBackendUmountCmd) {
204 if self
205 .fuse_deamon_stat
206 .fs_mount_cmd_map
207 .remove(&cmd.mountpoint)
208 .is_none()
209 {
210 warn!(
211 "mount state for {}: state was not saved before!",
212 cmd.mountpoint
213 )
214 }
215 }
216
217 fn save(&mut self, data: &[u8]) -> Result<()> {
219 let mut fds = Vec::new();
220 if let Some(ref f) = self.file {
221 fds.push(f.as_raw_fd())
222 }
223
224 self.backend
225 .save(&fds, data)
226 .map_err(UpgradeMgrError::StorageBackendError)?;
227 Ok(())
228 }
229
230 fn restore(&mut self) -> Result<Vec<u8>> {
232 let (fds, state_data) = self
233 .backend
234 .restore()
235 .map_err(UpgradeMgrError::StorageBackendError)?;
236 if fds.len() != 1 {
237 warn!("Too many fds {}, we may not correctly handle it", fds.len());
238 }
239 self.file = Some(unsafe { File::from_raw_fd(fds[0]) });
240 Ok(state_data)
241 }
242
243 pub fn hold_file(&mut self, fd: &File) -> Result<()> {
244 let f = fd.try_clone().map_err(UpgradeMgrError::CloneFile)?;
245 self.file = Some(f);
246
247 Ok(())
248 }
249
250 pub fn return_file(&mut self) -> Option<File> {
251 if let Some(ref f) = self.file {
252 f.try_clone()
254 .map_err(|e| {
255 error!("Clone file error, {}", e);
256 e
257 })
258 .ok()
259 } else {
260 warn!("No file can be returned");
261 None
262 }
263 }
264}
265#[cfg(target_os = "linux")]
266pub mod fscache_upgrade {
268 use std::convert::TryFrom;
269 use std::str::FromStr;
270
271 use super::*;
272 use crate::daemon::NydusDaemon;
273 use crate::singleton::ServiceController;
274 use nydus_upgrade::persist::Snapshotter;
275 use versionize::{VersionMap, Versionize, VersionizeResult};
276 use versionize_derive::Versionize;
277
278 #[derive(Versionize, Clone, Debug)]
279 pub struct BlobCacheEntryState {
280 json_str: String,
281 }
282
283 #[derive(Versionize, Clone, Default, Debug)]
284 pub struct FscacheBackendState {
285 blob_entry_list: Vec<(String, BlobCacheEntryState)>,
286 threads: usize,
287 path: String,
288 }
289
290 impl Snapshotter for FscacheBackendState {
291 fn get_versions() -> Vec<HashMap<TypeId, u16>> {
292 vec![
293 HashMap::from([(FscacheBackendState::type_id(), 1)]),
295 ]
297 }
298 }
299
300 impl TryFrom<&FscacheBackendState> for FscacheState {
301 type Error = std::io::Error;
302 fn try_from(backend_stat: &FscacheBackendState) -> std::result::Result<Self, Self::Error> {
303 let mut map = HashMap::new();
304 for (id, entry_stat) in &backend_stat.blob_entry_list {
305 let entry = BlobCacheEntry::from_str(&entry_stat.json_str)?;
306 map.insert(id.to_string(), entry);
307 }
308 Ok(FscacheState {
309 blob_entry_map: map,
310 threads: backend_stat.threads,
311 path: backend_stat.path.clone(),
312 })
313 }
314 }
315
316 impl TryFrom<&FscacheState> for FscacheBackendState {
317 type Error = std::io::Error;
318 fn try_from(stat: &FscacheState) -> std::result::Result<Self, Self::Error> {
319 let mut list = Vec::new();
320 for (id, entry) in &stat.blob_entry_map {
321 let entry_stat = serde_json::to_string(&entry)?;
322 list.push((
323 id.to_string(),
324 BlobCacheEntryState {
325 json_str: entry_stat,
326 },
327 ));
328 }
329 Ok(FscacheBackendState {
330 blob_entry_list: list,
331 threads: stat.threads,
332 path: stat.path.clone(),
333 })
334 }
335 }
336
337 pub fn save(daemon: &ServiceController) -> Result<()> {
338 if let Some(mut mgr) = daemon.upgrade_mgr() {
339 let backend_stat = FscacheBackendState::try_from(&mgr.fscache_deamon_stat)
340 .map_err(UpgradeMgrError::Serialize)?;
341 let stat = backend_stat.save().map_err(UpgradeMgrError::Serialize)?;
342 mgr.save(&stat)?;
343 }
344 Ok(())
345 }
346
347 pub fn restore(daemon: &ServiceController) -> Result<()> {
348 if let Some(mut mgr) = daemon.upgrade_mgr() {
349 if let Some(blob_mgr) = daemon.get_blob_cache_mgr() {
350 let mut state_data = mgr.restore()?;
352
353 let backend_stat = FscacheBackendState::restore(&mut state_data)
354 .map_err(UpgradeMgrError::Deserialize)?;
355
356 let stat =
357 FscacheState::try_from(&backend_stat).map_err(UpgradeMgrError::Deserialize)?;
358 stat.blob_entry_map
360 .iter()
361 .try_for_each(|(_, entry)| -> Result<()> {
362 blob_mgr
363 .add_blob_entry(entry)
364 .map_err(UpgradeMgrError::Deserialize)?;
365 Ok(())
366 })?;
367
368 if let Some(f) = mgr.return_file() {
370 daemon
371 .initialize_fscache_service(None, stat.threads, &stat.path, Some(&f))
372 .map_err(UpgradeMgrError::InitializeFscache)?;
373 }
374
375 mgr.fscache_deamon_stat = stat;
377 return Ok(());
378 }
379 }
380 Err(UpgradeMgrError::MissingSupervisorPath.into())
381 }
382}
383
384pub mod fusedev_upgrade {
386 use std::sync::atomic::Ordering;
387
388 use super::*;
389 use crate::daemon::NydusDaemon;
390 use crate::fusedev::{FusedevDaemon, FusedevFsService};
391 use nydus_upgrade::persist::Snapshotter;
392 use versionize::{VersionMap, Versionize, VersionizeResult};
393 use versionize_derive::Versionize;
394
395 #[derive(Versionize, Clone, Default, Debug)]
396 pub struct FusedevBackendState {
397 fs_mount_cmd_list: Vec<(String, MountStateWrapper)>,
398 vfs_state_data: Vec<u8>,
399 fuse_conn_id: u64,
400 }
401
402 impl Snapshotter for FusedevBackendState {
403 fn get_versions() -> Vec<HashMap<TypeId, u16>> {
404 vec![
405 HashMap::from([(FusedevBackendState::type_id(), 1)]),
407 ]
409 }
410 }
411
412 impl From<&FusedevBackendState> for FusedevState {
413 fn from(backend_stat: &FusedevBackendState) -> Self {
414 let mut map = HashMap::new();
415 for (mp, mw) in &backend_stat.fs_mount_cmd_list {
416 map.insert(mp.to_string(), mw.clone());
417 }
418 FusedevState {
419 fs_mount_cmd_map: map,
420 vfs_state_data: backend_stat.vfs_state_data.clone(),
421 fuse_conn_id: backend_stat.fuse_conn_id,
422 }
423 }
424 }
425
426 impl From<&FusedevState> for FusedevBackendState {
427 fn from(stat: &FusedevState) -> Self {
428 let mut list = Vec::new();
429 for (mp, mw) in &stat.fs_mount_cmd_map {
430 list.push((mp.to_string(), mw.clone()));
431 }
432 FusedevBackendState {
433 fs_mount_cmd_list: list,
434 vfs_state_data: stat.vfs_state_data.clone(),
435 fuse_conn_id: stat.fuse_conn_id,
436 }
437 }
438 }
439
440 pub fn save(daemon: &FusedevDaemon) -> Result<()> {
442 let svc = daemon.get_default_fs_service().ok_or(Error::NotFound)?;
443 let vfs = svc.get_vfs();
444 if !vfs.initialized() {
445 return Err(Error::NotReady);
446 }
447
448 let mut mgr = svc.upgrade_mgr().unwrap();
449 mgr.save_vfs_stat(vfs)?;
450
451 let backend_stat = FusedevBackendState::from(&mgr.fuse_deamon_stat);
452
453 let state = backend_stat.save().map_err(UpgradeMgrError::Serialize)?;
454 mgr.save(&state)?;
455
456 Ok(())
457 }
458
459 pub fn restore(daemon: &FusedevDaemon) -> Result<()> {
461 if daemon.supervisor.is_none() {
462 return Err(UpgradeMgrError::MissingSupervisorPath.into());
463 }
464
465 let svc = daemon.get_default_fs_service().ok_or(Error::NotFound)?;
466
467 let mut mgr = svc.upgrade_mgr().unwrap();
468
469 let mut state_data = mgr.restore()?;
471
472 let backend_state =
473 FusedevBackendState::restore(&mut state_data).map_err(UpgradeMgrError::Deserialize)?;
474
475 let mut state = FusedevState::from(&backend_state);
476
477 svc.as_any()
479 .downcast_ref::<FusedevFsService>()
480 .unwrap()
481 .conn
482 .store(state.fuse_conn_id, Ordering::Release);
483
484 if let Some(f) = mgr.return_file() {
486 let fuse_svc = svc.as_any().downcast_ref::<FusedevFsService>().unwrap();
487 fuse_svc.session.lock().unwrap().set_fuse_file(f);
488
489 if let Err(e) = fuse_svc.drain_fuse_requests() {
491 warn!("Failed to drain fuse requests: {}", e);
492 }
493 }
494
495 svc.get_vfs()
497 .restore_from_bytes(&mut state.vfs_state_data)?;
498 state
499 .fs_mount_cmd_map
500 .iter()
501 .try_for_each(|(_, mount_wrapper)| -> Result<()> {
502 svc.restore_mount(&mount_wrapper.cmd, mount_wrapper.vfs_index)?;
503 Ok(())
506 })?;
507
508 mgr.fuse_deamon_stat = state;
510
511 Ok(())
512 }
513}
514
515#[cfg(test)]
516mod tests {
517 use super::*;
518 use crate::fs_service::{FsBackendMountCmd, FsBackendUmountCmd};
519 #[cfg(target_os = "linux")]
520 use crate::upgrade::fscache_upgrade::FscacheBackendState;
521 use crate::upgrade::fusedev_upgrade::FusedevBackendState;
522 use crate::FsBackendType;
523 use nydus_upgrade::persist::Snapshotter;
524 use vmm_sys_util::tempfile::TempFile;
525
526 #[test]
527 fn test_failover_policy() {
528 assert_eq!(
529 FailoverPolicy::try_from("none").unwrap(),
530 FailoverPolicy::None
531 );
532 assert_eq!(
533 FailoverPolicy::try_from("flush").unwrap(),
534 FailoverPolicy::Flush
535 );
536 assert_eq!(
537 FailoverPolicy::try_from("resend").unwrap(),
538 FailoverPolicy::Resend
539 );
540
541 let strs = vec!["null", "flash", "Resend"];
542 for s in strs.clone().into_iter() {
543 assert!(FailoverPolicy::try_from(s).is_err());
544 }
545
546 let str = String::from("none");
547 assert_eq!(
548 FailoverPolicy::try_from(&str).unwrap(),
549 FailoverPolicy::None
550 );
551 let str = String::from("flush");
552 assert_eq!(
553 FailoverPolicy::try_from(&str).unwrap(),
554 FailoverPolicy::Flush
555 );
556 let str = String::from("resend");
557 assert_eq!(
558 FailoverPolicy::try_from(&str).unwrap(),
559 FailoverPolicy::Resend
560 );
561
562 let strings: Vec<String> = strs.into_iter().map(|s| s.to_owned()).collect();
563 for s in strings.iter() {
564 assert!(FailoverPolicy::try_from(s).is_err());
565 }
566 }
567
568 #[test]
569 #[cfg(target_os = "linux")]
570 fn test_upgrade_manager_for_fscache() {
571 let mut upgrade_mgr = UpgradeManager::new("dummy_socket".into());
572
573 let content = r#"{
574 "type": "bootstrap",
575 "id": "blob1",
576 "config": {
577 "id": "cache1",
578 "backend_type": "localfs",
579 "backend_config": {},
580 "cache_type": "fscache",
581 "cache_config": {},
582 "metadata_path": "/tmp/metadata1"
583 },
584 "domain_id": "domain1"
585 }"#;
586 let entry: BlobCacheEntry = serde_json::from_str(content).unwrap();
587 upgrade_mgr.save_fscache_states(4, "/tmp/fscache_dir".to_string());
588 assert_eq!(upgrade_mgr.fscache_deamon_stat.threads, 4);
589 assert_eq!(upgrade_mgr.fscache_deamon_stat.path, "/tmp/fscache_dir");
590
591 upgrade_mgr.add_blob_entry_state(entry);
592 assert!(upgrade_mgr
593 .fscache_deamon_stat
594 .blob_entry_map
595 .contains_key("domain1/blob1"));
596
597 assert!(FscacheBackendState::try_from(&upgrade_mgr.fscache_deamon_stat).is_ok());
598
599 let backend_stat = FscacheBackendState::try_from(&upgrade_mgr.fscache_deamon_stat).unwrap();
600 assert!(backend_stat.save().is_ok());
601 assert!(FscacheState::try_from(&backend_stat).is_ok());
602 let stat = FscacheState::try_from(&backend_stat).unwrap();
603 assert_eq!(stat.path, upgrade_mgr.fscache_deamon_stat.path);
604 assert_eq!(stat.threads, upgrade_mgr.fscache_deamon_stat.threads);
605 assert!(stat.blob_entry_map.contains_key("domain1/blob1"));
606
607 upgrade_mgr.remove_blob_entry_state("domain1", "blob1");
608 assert!(!upgrade_mgr
609 .fscache_deamon_stat
610 .blob_entry_map
611 .contains_key("domain1/blob1"));
612 }
613
614 #[test]
615 fn test_upgrade_manager_for_fusedev() {
616 let mut upgrade_mgr = UpgradeManager::new("dummy_socket".into());
617
618 let config = r#"{
619 "version": 2,
620 "id": "factory1",
621 "backend": {
622 "type": "localfs",
623 "localfs": {
624 "dir": "/tmp/nydus"
625 }
626 },
627 "cache": {
628 "type": "fscache",
629 "fscache": {
630 "work_dir": "/tmp/nydus"
631 }
632 },
633 "metadata_path": "/tmp/nydus/bootstrap1"
634 }"#;
635 let cmd = FsBackendMountCmd {
636 fs_type: FsBackendType::Rafs,
637 config: config.to_string(),
638 mountpoint: "testmonutount".to_string(),
639 source: "testsource".to_string(),
640 prefetch_files: Some(vec!["testfile".to_string()]),
641 };
642
643 upgrade_mgr.save_fuse_cid(10);
644 assert_eq!(upgrade_mgr.fuse_deamon_stat.fuse_conn_id, 10);
645 upgrade_mgr.add_mounts_state(cmd.clone(), 5);
646 assert!(upgrade_mgr
647 .fuse_deamon_stat
648 .fs_mount_cmd_map
649 .contains_key("testmonutount"));
650 assert!(upgrade_mgr.update_mounts_state(cmd).is_ok());
651
652 let backend_stat = FusedevBackendState::from(&upgrade_mgr.fuse_deamon_stat);
653 assert!(backend_stat.save().is_ok());
654
655 let stat = FusedevState::from(&backend_stat);
656 assert_eq!(stat.fuse_conn_id, upgrade_mgr.fuse_deamon_stat.fuse_conn_id);
657 assert!(stat.fs_mount_cmd_map.contains_key("testmonutount"));
658
659 let umount_cmd: FsBackendUmountCmd = FsBackendUmountCmd {
660 mountpoint: "testmonutount".to_string(),
661 };
662 upgrade_mgr.remove_mounts_state(umount_cmd);
663 assert!(!upgrade_mgr
664 .fuse_deamon_stat
665 .fs_mount_cmd_map
666 .contains_key("testmonutount"));
667 }
668
669 #[test]
670 fn test_upgrade_manager_hold_fd() {
671 let mut upgrade_mgr = UpgradeManager::new("dummy_socket".into());
672
673 let temp = TempFile::new().unwrap().into_file();
674 assert!(upgrade_mgr.hold_file(&temp).is_ok());
675 assert!(upgrade_mgr.return_file().is_some());
676 }
677}