nydus_service/
daemon.rs

1// Copyright 2020 Ant Group. All rights reserved.
2// Copyright (C) 2020-2022 Alibaba Cloud. All rights reserved.
3// Copyright 2019 Intel Corporation. All Rights Reserved.
4//
5// SPDX-License-Identifier: (Apache-2.0 AND BSD-3-Clause)
6
7//! Infrastructure to define and manage Nydus service daemons.
8
9use std::any::Any;
10use std::cmp::PartialEq;
11use std::convert::From;
12use std::fmt::{Display, Formatter};
13use std::ops::Deref;
14use std::process;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::mpsc::{Receiver, Sender};
17use std::sync::{Arc, Mutex, MutexGuard};
18use std::thread::{Builder, JoinHandle};
19
20use mio::{Events, Poll, Token, Waker};
21use nydus_api::BuildTimeInfo;
22use rust_fsm::*;
23use serde::{self, Serialize};
24
25use crate::fs_service::{FsBackendCollection, FsService};
26use crate::upgrade::UpgradeManager;
27use crate::{BlobCacheMgr, Error, Result};
28
29/// Nydus daemon working states.
30#[allow(clippy::upper_case_acronyms)]
31#[derive(Debug, Hash, PartialEq, Eq, Serialize)]
32pub enum DaemonState {
33    INIT = 1,
34    RUNNING = 2,
35    READY = 3,
36    STOPPED = 4,
37    UNKNOWN = 5,
38}
39
40impl Display for DaemonState {
41    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
42        write!(f, "{:?}", self)
43    }
44}
45
46impl From<i32> for DaemonState {
47    fn from(i: i32) -> Self {
48        match i {
49            1 => DaemonState::INIT,
50            2 => DaemonState::RUNNING,
51            3 => DaemonState::READY,
52            4 => DaemonState::STOPPED,
53            _ => DaemonState::UNKNOWN,
54        }
55    }
56}
57
58/// Build, version and working state information for Nydus daemons.
59#[derive(Serialize)]
60pub struct DaemonInfo {
61    /// Build and version information.
62    pub version: BuildTimeInfo,
63    /// Optional daemon identifier.
64    pub id: Option<String>,
65    /// Optional daemon supervisor configuration information.
66    pub supervisor: Option<String>,
67    /// Daemon working state.
68    pub state: DaemonState,
69    /// Optional metrics and statistics about filesystem instances.
70    pub backend_collection: Option<FsBackendCollection>,
71}
72
73/// Abstract interfaces for Nydus daemon objects.
74///
75/// The [`NydusDaemon`] trait defines interfaces that an Nydus daemon object should implement,
76/// so the daemon manager can manage those objects.
77pub trait NydusDaemon: DaemonStateMachineSubscriber + Send + Sync {
78    /// Cast `self` to trait object of [Any] to support object downcast.
79    fn as_any(&self) -> &dyn Any;
80
81    /// Get optional daemon identifier.
82    fn id(&self) -> Option<String>;
83
84    /// Get build and version information.
85    fn version(&self) -> BuildTimeInfo;
86
87    /// Get status information about the daemon.
88    fn export_info(&self, include_fs_info: bool) -> Result<String> {
89        let mut response = DaemonInfo {
90            version: self.version(),
91            id: self.id(),
92            supervisor: self.supervisor(),
93            state: self.get_state(),
94            backend_collection: None,
95        };
96        if include_fs_info {
97            if let Some(fs) = self.get_default_fs_service() {
98                response.backend_collection = Some(fs.backend_collection().deref().clone());
99            }
100        }
101
102        serde_json::to_string(&response).map_err(Error::Serde)
103    }
104
105    /// Get daemon working state.
106    fn get_state(&self) -> DaemonState;
107    /// Set daemon working state.
108    fn set_state(&self, s: DaemonState);
109    /// Start the daemon object to serve incoming requests.
110    fn start(&self) -> Result<()>;
111    /// Umount the FUSE filesystem.
112    fn umount(&self) -> Result<()>;
113    /// Stop the daemon object.
114    fn stop(&self) {}
115    /// Trigger `Stop` transition event to stop the daemon.
116    fn trigger_stop(&self) -> Result<()> {
117        let s = self.get_state();
118
119        if s == DaemonState::STOPPED {
120            return Ok(());
121        }
122
123        if s == DaemonState::RUNNING {
124            self.on_event(DaemonStateMachineInput::Stop)?;
125        }
126
127        self.on_event(DaemonStateMachineInput::Stop)
128    }
129    /// Trigger transition events to move the state machine to `STOPPED` state.
130    fn trigger_exit(&self) -> Result<()> {
131        let s = self.get_state();
132
133        if s == DaemonState::STOPPED {
134            return Ok(());
135        }
136
137        if s == DaemonState::INIT {
138            return self.on_event(DaemonStateMachineInput::Stop);
139        }
140
141        if s == DaemonState::RUNNING {
142            self.on_event(DaemonStateMachineInput::Stop)?;
143        }
144
145        self.on_event(DaemonStateMachineInput::Exit)
146    }
147
148    /// Wait for daemon to exit.
149    fn wait(&self) -> Result<()>;
150    /// Wait for service worker thread to exit.
151    fn wait_service(&self) -> Result<()> {
152        Ok(())
153    }
154    /// Wait for state machine worker thread to exit.
155    fn wait_state_machine(&self) -> Result<()> {
156        Ok(())
157    }
158
159    /// Get supervisor configuration information.
160    fn supervisor(&self) -> Option<String>;
161    /// Save state for online upgrade.
162    fn save(&self) -> Result<()>;
163    /// Restore state for online upgrade.
164    fn restore(&self) -> Result<()>;
165    /// Trigger `Takeover` transition event to take over control from old instance.
166    fn trigger_takeover(&self) -> Result<()> {
167        self.on_event(DaemonStateMachineInput::Takeover)
168    }
169    /// Trigger `Start` transition event to start the new instance.
170    fn trigger_start(&self) -> Result<()> {
171        self.on_event(DaemonStateMachineInput::Start)
172    }
173
174    fn upgrade_mgr(&self) -> Option<MutexGuard<'_, UpgradeManager>> {
175        None
176    }
177
178    // For backward compatibility.
179    /// Set default filesystem service object.
180    fn get_default_fs_service(&self) -> Option<Arc<dyn FsService>> {
181        None
182    }
183
184    /// Get the optional `BlobCacheMgr` object.
185    fn get_blob_cache_mgr(&self) -> Option<Arc<BlobCacheMgr>> {
186        None
187    }
188
189    /// Delete a blob object managed by the daemon.
190    fn delete_blob(&self, _blob_id: String) -> Result<()> {
191        Ok(())
192    }
193}
194
195// State machine for Nydus daemon workflow.
196//
197// Valid states for Nydus daemon state machine:
198// - `Init` means nydusd is just started and potentially configured well but not
199//    yet negotiate with kernel the capabilities of both sides. It even does not try
200//    to set up fuse session by mounting `/fuse/dev`(in case of `fusedev` backend).
201// - `Ready` means nydusd is ready for start or die. Fuse session is created.
202// - `Running` means nydusd has successfully prepared all the stuff needed to work as a
203//   user-space fuse filesystem, however, the essential capabilities negotiation might not be
204//   done yet. It relies on `fuse-rs` to tell if capability negotiation is done.
205// - `Die` state means the whole nydusd process is going to die.
206state_machine! {
207    derive(Debug, Clone)
208    pub DaemonStateMachine(Init)
209
210    Init => {
211        Mount => Ready,
212        Takeover => Ready[Restore],
213        Stop => Die[StopStateMachine],
214    },
215    Ready => {
216        Start => Running[StartService],
217        Stop => Die[Umount],
218        Exit => Die[StopStateMachine],
219    },
220    Running => {
221        Stop => Ready [TerminateService],
222    },
223}
224
225/// An implementation of the state machine defined by [`DaemonStateMachine`].
226pub struct DaemonStateMachineContext {
227    pid: u32,
228    daemon: Arc<dyn NydusDaemon>,
229    sm: StateMachine<DaemonStateMachine>,
230    request_receiver: Receiver<DaemonStateMachineInput>,
231    result_sender: Sender<Result<()>>,
232}
233
234impl DaemonStateMachineContext {
235    /// Create a new instance of [`DaemonStateMachineContext`].
236    pub fn new(
237        daemon: Arc<dyn NydusDaemon>,
238        request_receiver: Receiver<DaemonStateMachineInput>,
239        result_sender: Sender<Result<()>>,
240    ) -> Self {
241        DaemonStateMachineContext {
242            pid: process::id(),
243            daemon,
244            sm: StateMachine::new(),
245            request_receiver,
246            result_sender,
247        }
248    }
249
250    /// Create a worker thread to run event loop for the state machine.
251    pub fn kick_state_machine(self) -> Result<JoinHandle<std::io::Result<()>>> {
252        Builder::new()
253            .name("state_machine".to_string())
254            .spawn(move || self.run_state_machine_event_loop())
255            .map_err(Error::ThreadSpawn)
256    }
257
258    fn run_state_machine_event_loop(mut self) -> std::io::Result<()> {
259        loop {
260            use DaemonStateMachineOutput::*;
261            let event = self
262                .request_receiver
263                .recv()
264                .expect("Event channel can't be broken!");
265            let last = self.sm.state().clone();
266            let input = &event;
267
268            let action = if let Ok(a) = self.sm.consume(&event) {
269                a
270            } else {
271                error!(
272                    "Wrong event input. Event={:?}, CurrentState={:?}",
273                    input, &last
274                );
275                // Safe to unwrap because channel is never closed
276                self.result_sender
277                    .send(Err(Error::UnexpectedEvent(event)))
278                    .unwrap();
279                continue;
280            };
281
282            let d = self.daemon.as_ref();
283            let cur = self.sm.state();
284            info!(
285                "State machine(pid={}): from {:?} to {:?}, input [{:?}], output [{:?}]",
286                &self.pid, last, cur, input, &action
287            );
288            let r = match action {
289                Some(StartService) => d.start().inspect(|_r| {
290                    d.set_state(DaemonState::RUNNING);
291                }),
292                Some(TerminateService) => {
293                    d.stop();
294                    let res = d.wait_service();
295                    if res.is_ok() {
296                        d.set_state(DaemonState::READY);
297                    }
298                    res
299                }
300                Some(Umount) => d.umount().inspect(|_r| {
301                    // Always interrupt fuse service loop after shutdown connection to kernel.
302                    // In case that kernel does not really shutdown the session due to some reasons
303                    // causing service loop keep waiting of `/dev/fuse`.
304                    d.stop();
305                    d.wait_service()
306                        .unwrap_or_else(|e| error!("failed to wait service {}", e));
307                    // at least all fuse thread stopped, no matter what error each thread got
308                    d.set_state(DaemonState::STOPPED);
309                }),
310                Some(Restore) => {
311                    let res = d.restore();
312                    if res.is_ok() {
313                        d.set_state(DaemonState::READY);
314                    }
315                    res
316                }
317                Some(StopStateMachine) => {
318                    d.set_state(DaemonState::STOPPED);
319                    Ok(())
320                }
321                // With no output action involved, caller should also have reply back
322                None => Ok(()),
323            };
324
325            // Safe to unwrap because channel is never closed
326            self.result_sender.send(r).unwrap();
327            // Quit state machine thread if interrupted or stopped
328            if d.get_state() == DaemonState::STOPPED {
329                break;
330            }
331        }
332
333        info!("state_machine thread exits");
334        Ok(())
335    }
336}
337
338/// Handler to process state transition events emitted from the state machine.
339pub trait DaemonStateMachineSubscriber {
340    /// Event handler to process state transition events.
341    ///
342    /// It will be invoked in single-threaded context.
343    fn on_event(&self, event: DaemonStateMachineInput) -> Result<()>;
344}
345
346/// Controller to manage registered filesystem/blobcache/fscache services.
347pub struct DaemonController {
348    active: AtomicBool,
349    singleton_mode: AtomicBool,
350    daemon: Mutex<Option<Arc<dyn NydusDaemon>>>,
351    blob_cache_mgr: Mutex<Option<Arc<BlobCacheMgr>>>,
352    // For backward compatibility to support singleton fusedev/virtiofs server.
353    fs_service: Mutex<Option<Arc<dyn FsService>>>,
354    waker: Arc<Waker>,
355    poller: Mutex<Poll>,
356}
357
358impl DaemonController {
359    /// Create a new instance of [DaemonController].
360    pub fn new() -> Self {
361        let poller = Poll::new().expect("Failed to create poller for DaemonController");
362        let waker = Waker::new(poller.registry(), Token(1))
363            .expect("Failed to create waker for DaemonController");
364
365        Self {
366            active: AtomicBool::new(true),
367            singleton_mode: AtomicBool::new(false),
368            daemon: Mutex::new(None),
369            blob_cache_mgr: Mutex::new(None),
370            fs_service: Mutex::new(None),
371            waker: Arc::new(waker),
372            poller: Mutex::new(poller),
373        }
374    }
375
376    /// Check whether the service controller is still in active/working state.
377    pub fn is_active(&self) -> bool {
378        self.active.load(Ordering::Acquire)
379    }
380
381    /// Allocate a waker to notify stop events.
382    pub fn alloc_waker(&self) -> Arc<Waker> {
383        self.waker.clone()
384    }
385
386    /// Enable/disable singleton mode.
387    pub fn set_singleton_mode(&self, enabled: bool) {
388        self.singleton_mode.store(enabled, Ordering::Release);
389    }
390
391    /// Set the daemon service object.
392    pub fn set_daemon(&self, daemon: Arc<dyn NydusDaemon>) -> Option<Arc<dyn NydusDaemon>> {
393        self.daemon.lock().unwrap().replace(daemon)
394    }
395
396    /// Get the daemon service object.
397    ///
398    /// Panic if called before `set_daemon()` has been called.
399    pub fn get_daemon(&self) -> Arc<dyn NydusDaemon> {
400        self.daemon.lock().unwrap().clone().unwrap()
401    }
402
403    /// Get the optional blob cache manager.
404    pub fn get_blob_cache_mgr(&self) -> Option<Arc<BlobCacheMgr>> {
405        self.blob_cache_mgr.lock().unwrap().clone()
406    }
407
408    /// Set the optional blob cache manager.
409    pub fn set_blob_cache_mgr(&self, mgr: Arc<BlobCacheMgr>) -> Option<Arc<BlobCacheMgr>> {
410        self.blob_cache_mgr.lock().unwrap().replace(mgr)
411    }
412
413    /// Set the default fs service object.
414    pub fn set_fs_service(&self, service: Arc<dyn FsService>) -> Option<Arc<dyn FsService>> {
415        self.fs_service.lock().unwrap().replace(service)
416    }
417
418    /// Get the default fs service object.
419    pub fn get_fs_service(&self) -> Option<Arc<dyn FsService>> {
420        self.fs_service.lock().unwrap().clone()
421    }
422
423    /// Notify controller shutdown
424    pub fn notify_shutdown(&self) {
425        // Marking exiting state.
426        self.active.store(false, Ordering::Release);
427        // Signal the `run_loop()` working thread to exit.
428        let _ = self.waker.wake();
429    }
430
431    /// Shutdown all services managed by the controller.
432    pub fn shutdown(&self) {
433        let daemon = self.daemon.lock().unwrap().take();
434        if let Some(d) = daemon {
435            if let Err(e) = d.trigger_stop() {
436                error!("failed to stop daemon: {}", e);
437            }
438            if let Err(e) = d.wait() {
439                error!("failed to wait daemon: {}", e)
440            }
441        }
442    }
443
444    /// Run the event loop to handle service management events.
445    pub fn run_loop(&self) {
446        let mut events = Events::with_capacity(8);
447
448        loop {
449            match self.poller.lock().unwrap().poll(&mut events, None) {
450                Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
451                Err(e) => error!("failed to receive notification from waker: {}", e),
452                Ok(_) => {}
453            }
454
455            for event in events.iter() {
456                if event.is_error() {
457                    error!("Got error on the monitored event.");
458                    continue;
459                }
460
461                if event.is_readable() && event.token() == Token(1) {
462                    if !self.active.load(Ordering::Acquire) {
463                        return;
464                    } else if !self.singleton_mode.load(Ordering::Acquire) {
465                        self.active.store(false, Ordering::Relaxed);
466                        return;
467                    }
468                }
469            }
470        }
471    }
472}
473
474impl Default for DaemonController {
475    fn default() -> Self {
476        DaemonController::new()
477    }
478}
479
480#[cfg(test)]
481mod tests {
482    use super::*;
483    use crate::FsBackendType;
484
485    #[test]
486    fn it_should_convert_int_to_daemonstate() {
487        let stat = DaemonState::from(1);
488        assert_eq!(stat, DaemonState::INIT);
489
490        let stat = DaemonState::from(2);
491        assert_eq!(stat, DaemonState::RUNNING);
492
493        let stat = DaemonState::from(3);
494        assert_eq!(stat, DaemonState::READY);
495
496        let stat = DaemonState::from(4);
497        assert_eq!(stat, DaemonState::STOPPED);
498
499        let stat = DaemonState::from(5);
500        assert_eq!(stat, DaemonState::UNKNOWN);
501
502        let stat = DaemonState::from(8);
503        assert_eq!(stat, DaemonState::UNKNOWN);
504    }
505
506    #[test]
507    fn it_should_convert_str_to_fsbackendtype() {
508        let backend_type: FsBackendType = "rafs".parse().unwrap();
509        assert_eq!(backend_type, FsBackendType::Rafs);
510
511        let backend_type: FsBackendType = "passthrough_fs".parse().unwrap();
512        assert_eq!(backend_type, FsBackendType::PassthroughFs);
513
514        assert!("xxxxxxxxxxxxx".parse::<FsBackendType>().is_err());
515    }
516}