1use std::any::Any;
10use std::cmp::PartialEq;
11use std::convert::From;
12use std::fmt::{Display, Formatter};
13use std::ops::Deref;
14use std::process;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::mpsc::{Receiver, Sender};
17use std::sync::{Arc, Mutex, MutexGuard};
18use std::thread::{Builder, JoinHandle};
19
20use mio::{Events, Poll, Token, Waker};
21use nydus_api::BuildTimeInfo;
22use rust_fsm::*;
23use serde::{self, Serialize};
24
25use crate::fs_service::{FsBackendCollection, FsService};
26use crate::upgrade::UpgradeManager;
27use crate::{BlobCacheMgr, Error, Result};
28
29#[allow(clippy::upper_case_acronyms)]
31#[derive(Debug, Hash, PartialEq, Eq, Serialize)]
32pub enum DaemonState {
33 INIT = 1,
34 RUNNING = 2,
35 READY = 3,
36 STOPPED = 4,
37 UNKNOWN = 5,
38}
39
40impl Display for DaemonState {
41 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
42 write!(f, "{:?}", self)
43 }
44}
45
46impl From<i32> for DaemonState {
47 fn from(i: i32) -> Self {
48 match i {
49 1 => DaemonState::INIT,
50 2 => DaemonState::RUNNING,
51 3 => DaemonState::READY,
52 4 => DaemonState::STOPPED,
53 _ => DaemonState::UNKNOWN,
54 }
55 }
56}
57
58#[derive(Serialize)]
60pub struct DaemonInfo {
61 pub version: BuildTimeInfo,
63 pub id: Option<String>,
65 pub supervisor: Option<String>,
67 pub state: DaemonState,
69 pub backend_collection: Option<FsBackendCollection>,
71}
72
73pub trait NydusDaemon: DaemonStateMachineSubscriber + Send + Sync {
78 fn as_any(&self) -> &dyn Any;
80
81 fn id(&self) -> Option<String>;
83
84 fn version(&self) -> BuildTimeInfo;
86
87 fn export_info(&self, include_fs_info: bool) -> Result<String> {
89 let mut response = DaemonInfo {
90 version: self.version(),
91 id: self.id(),
92 supervisor: self.supervisor(),
93 state: self.get_state(),
94 backend_collection: None,
95 };
96 if include_fs_info {
97 if let Some(fs) = self.get_default_fs_service() {
98 response.backend_collection = Some(fs.backend_collection().deref().clone());
99 }
100 }
101
102 serde_json::to_string(&response).map_err(Error::Serde)
103 }
104
105 fn get_state(&self) -> DaemonState;
107 fn set_state(&self, s: DaemonState);
109 fn start(&self) -> Result<()>;
111 fn umount(&self) -> Result<()>;
113 fn stop(&self) {}
115 fn trigger_stop(&self) -> Result<()> {
117 let s = self.get_state();
118
119 if s == DaemonState::STOPPED {
120 return Ok(());
121 }
122
123 if s == DaemonState::RUNNING {
124 self.on_event(DaemonStateMachineInput::Stop)?;
125 }
126
127 self.on_event(DaemonStateMachineInput::Stop)
128 }
129 fn trigger_exit(&self) -> Result<()> {
131 let s = self.get_state();
132
133 if s == DaemonState::STOPPED {
134 return Ok(());
135 }
136
137 if s == DaemonState::INIT {
138 return self.on_event(DaemonStateMachineInput::Stop);
139 }
140
141 if s == DaemonState::RUNNING {
142 self.on_event(DaemonStateMachineInput::Stop)?;
143 }
144
145 self.on_event(DaemonStateMachineInput::Exit)
146 }
147
148 fn wait(&self) -> Result<()>;
150 fn wait_service(&self) -> Result<()> {
152 Ok(())
153 }
154 fn wait_state_machine(&self) -> Result<()> {
156 Ok(())
157 }
158
159 fn supervisor(&self) -> Option<String>;
161 fn save(&self) -> Result<()>;
163 fn restore(&self) -> Result<()>;
165 fn trigger_takeover(&self) -> Result<()> {
167 self.on_event(DaemonStateMachineInput::Takeover)
168 }
169 fn trigger_start(&self) -> Result<()> {
171 self.on_event(DaemonStateMachineInput::Start)
172 }
173
174 fn upgrade_mgr(&self) -> Option<MutexGuard<'_, UpgradeManager>> {
175 None
176 }
177
178 fn get_default_fs_service(&self) -> Option<Arc<dyn FsService>> {
181 None
182 }
183
184 fn get_blob_cache_mgr(&self) -> Option<Arc<BlobCacheMgr>> {
186 None
187 }
188
189 fn delete_blob(&self, _blob_id: String) -> Result<()> {
191 Ok(())
192 }
193}
194
195state_machine! {
207 derive(Debug, Clone)
208 pub DaemonStateMachine(Init)
209
210 Init => {
211 Mount => Ready,
212 Takeover => Ready[Restore],
213 Stop => Die[StopStateMachine],
214 },
215 Ready => {
216 Start => Running[StartService],
217 Stop => Die[Umount],
218 Exit => Die[StopStateMachine],
219 },
220 Running => {
221 Stop => Ready [TerminateService],
222 },
223}
224
225pub struct DaemonStateMachineContext {
227 pid: u32,
228 daemon: Arc<dyn NydusDaemon>,
229 sm: StateMachine<DaemonStateMachine>,
230 request_receiver: Receiver<DaemonStateMachineInput>,
231 result_sender: Sender<Result<()>>,
232}
233
234impl DaemonStateMachineContext {
235 pub fn new(
237 daemon: Arc<dyn NydusDaemon>,
238 request_receiver: Receiver<DaemonStateMachineInput>,
239 result_sender: Sender<Result<()>>,
240 ) -> Self {
241 DaemonStateMachineContext {
242 pid: process::id(),
243 daemon,
244 sm: StateMachine::new(),
245 request_receiver,
246 result_sender,
247 }
248 }
249
250 pub fn kick_state_machine(self) -> Result<JoinHandle<std::io::Result<()>>> {
252 Builder::new()
253 .name("state_machine".to_string())
254 .spawn(move || self.run_state_machine_event_loop())
255 .map_err(Error::ThreadSpawn)
256 }
257
258 fn run_state_machine_event_loop(mut self) -> std::io::Result<()> {
259 loop {
260 use DaemonStateMachineOutput::*;
261 let event = self
262 .request_receiver
263 .recv()
264 .expect("Event channel can't be broken!");
265 let last = self.sm.state().clone();
266 let input = &event;
267
268 let action = if let Ok(a) = self.sm.consume(&event) {
269 a
270 } else {
271 error!(
272 "Wrong event input. Event={:?}, CurrentState={:?}",
273 input, &last
274 );
275 self.result_sender
277 .send(Err(Error::UnexpectedEvent(event)))
278 .unwrap();
279 continue;
280 };
281
282 let d = self.daemon.as_ref();
283 let cur = self.sm.state();
284 info!(
285 "State machine(pid={}): from {:?} to {:?}, input [{:?}], output [{:?}]",
286 &self.pid, last, cur, input, &action
287 );
288 let r = match action {
289 Some(StartService) => d.start().inspect(|_r| {
290 d.set_state(DaemonState::RUNNING);
291 }),
292 Some(TerminateService) => {
293 d.stop();
294 let res = d.wait_service();
295 if res.is_ok() {
296 d.set_state(DaemonState::READY);
297 }
298 res
299 }
300 Some(Umount) => d.umount().inspect(|_r| {
301 d.stop();
305 d.wait_service()
306 .unwrap_or_else(|e| error!("failed to wait service {}", e));
307 d.set_state(DaemonState::STOPPED);
309 }),
310 Some(Restore) => {
311 let res = d.restore();
312 if res.is_ok() {
313 d.set_state(DaemonState::READY);
314 }
315 res
316 }
317 Some(StopStateMachine) => {
318 d.set_state(DaemonState::STOPPED);
319 Ok(())
320 }
321 None => Ok(()),
323 };
324
325 self.result_sender.send(r).unwrap();
327 if d.get_state() == DaemonState::STOPPED {
329 break;
330 }
331 }
332
333 info!("state_machine thread exits");
334 Ok(())
335 }
336}
337
338pub trait DaemonStateMachineSubscriber {
340 fn on_event(&self, event: DaemonStateMachineInput) -> Result<()>;
344}
345
346pub struct DaemonController {
348 active: AtomicBool,
349 singleton_mode: AtomicBool,
350 daemon: Mutex<Option<Arc<dyn NydusDaemon>>>,
351 blob_cache_mgr: Mutex<Option<Arc<BlobCacheMgr>>>,
352 fs_service: Mutex<Option<Arc<dyn FsService>>>,
354 waker: Arc<Waker>,
355 poller: Mutex<Poll>,
356}
357
358impl DaemonController {
359 pub fn new() -> Self {
361 let poller = Poll::new().expect("Failed to create poller for DaemonController");
362 let waker = Waker::new(poller.registry(), Token(1))
363 .expect("Failed to create waker for DaemonController");
364
365 Self {
366 active: AtomicBool::new(true),
367 singleton_mode: AtomicBool::new(false),
368 daemon: Mutex::new(None),
369 blob_cache_mgr: Mutex::new(None),
370 fs_service: Mutex::new(None),
371 waker: Arc::new(waker),
372 poller: Mutex::new(poller),
373 }
374 }
375
376 pub fn is_active(&self) -> bool {
378 self.active.load(Ordering::Acquire)
379 }
380
381 pub fn alloc_waker(&self) -> Arc<Waker> {
383 self.waker.clone()
384 }
385
386 pub fn set_singleton_mode(&self, enabled: bool) {
388 self.singleton_mode.store(enabled, Ordering::Release);
389 }
390
391 pub fn set_daemon(&self, daemon: Arc<dyn NydusDaemon>) -> Option<Arc<dyn NydusDaemon>> {
393 self.daemon.lock().unwrap().replace(daemon)
394 }
395
396 pub fn get_daemon(&self) -> Arc<dyn NydusDaemon> {
400 self.daemon.lock().unwrap().clone().unwrap()
401 }
402
403 pub fn get_blob_cache_mgr(&self) -> Option<Arc<BlobCacheMgr>> {
405 self.blob_cache_mgr.lock().unwrap().clone()
406 }
407
408 pub fn set_blob_cache_mgr(&self, mgr: Arc<BlobCacheMgr>) -> Option<Arc<BlobCacheMgr>> {
410 self.blob_cache_mgr.lock().unwrap().replace(mgr)
411 }
412
413 pub fn set_fs_service(&self, service: Arc<dyn FsService>) -> Option<Arc<dyn FsService>> {
415 self.fs_service.lock().unwrap().replace(service)
416 }
417
418 pub fn get_fs_service(&self) -> Option<Arc<dyn FsService>> {
420 self.fs_service.lock().unwrap().clone()
421 }
422
423 pub fn notify_shutdown(&self) {
425 self.active.store(false, Ordering::Release);
427 let _ = self.waker.wake();
429 }
430
431 pub fn shutdown(&self) {
433 let daemon = self.daemon.lock().unwrap().take();
434 if let Some(d) = daemon {
435 if let Err(e) = d.trigger_stop() {
436 error!("failed to stop daemon: {}", e);
437 }
438 if let Err(e) = d.wait() {
439 error!("failed to wait daemon: {}", e)
440 }
441 }
442 }
443
444 pub fn run_loop(&self) {
446 let mut events = Events::with_capacity(8);
447
448 loop {
449 match self.poller.lock().unwrap().poll(&mut events, None) {
450 Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
451 Err(e) => error!("failed to receive notification from waker: {}", e),
452 Ok(_) => {}
453 }
454
455 for event in events.iter() {
456 if event.is_error() {
457 error!("Got error on the monitored event.");
458 continue;
459 }
460
461 if event.is_readable() && event.token() == Token(1) {
462 if !self.active.load(Ordering::Acquire) {
463 return;
464 } else if !self.singleton_mode.load(Ordering::Acquire) {
465 self.active.store(false, Ordering::Relaxed);
466 return;
467 }
468 }
469 }
470 }
471 }
472}
473
474impl Default for DaemonController {
475 fn default() -> Self {
476 DaemonController::new()
477 }
478}
479
480#[cfg(test)]
481mod tests {
482 use super::*;
483 use crate::FsBackendType;
484
485 #[test]
486 fn it_should_convert_int_to_daemonstate() {
487 let stat = DaemonState::from(1);
488 assert_eq!(stat, DaemonState::INIT);
489
490 let stat = DaemonState::from(2);
491 assert_eq!(stat, DaemonState::RUNNING);
492
493 let stat = DaemonState::from(3);
494 assert_eq!(stat, DaemonState::READY);
495
496 let stat = DaemonState::from(4);
497 assert_eq!(stat, DaemonState::STOPPED);
498
499 let stat = DaemonState::from(5);
500 assert_eq!(stat, DaemonState::UNKNOWN);
501
502 let stat = DaemonState::from(8);
503 assert_eq!(stat, DaemonState::UNKNOWN);
504 }
505
506 #[test]
507 fn it_should_convert_str_to_fsbackendtype() {
508 let backend_type: FsBackendType = "rafs".parse().unwrap();
509 assert_eq!(backend_type, FsBackendType::Rafs);
510
511 let backend_type: FsBackendType = "passthrough_fs".parse().unwrap();
512 assert_eq!(backend_type, FsBackendType::PassthroughFs);
513
514 assert!("xxxxxxxxxxxxx".parse::<FsBackendType>().is_err());
515 }
516}