kaspa_core/
core.rs

1use crate::service::Service;
2use crate::signals::Shutdown;
3use crate::trace;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex};
6
7pub struct Core {
8    pub keep_running: AtomicBool,
9    services: Mutex<Vec<Arc<dyn Service>>>,
10}
11
12impl Default for Core {
13    fn default() -> Self {
14        Self::new()
15    }
16}
17
18impl Core {
19    pub fn new() -> Core {
20        Core { keep_running: AtomicBool::new(true), services: Mutex::new(Vec::new()) }
21    }
22
23    pub fn bind<T>(&self, service: Arc<T>)
24    where
25        T: Service,
26    {
27        self.services.lock().unwrap().push(service);
28    }
29
30    pub fn find(&self, ident: &'static str) -> Option<Arc<dyn Service>> {
31        self.services.lock().unwrap().iter().find(|s| (*s).clone().ident() == ident).cloned()
32    }
33
34    /// Starts all services and blocks waiting to join them. For performing other operations in between
35    /// use start and join explicitly
36    pub fn run(self: &Arc<Core>) {
37        self.join(self.start());
38    }
39
40    /// Start all services and return `std::thread` join handles
41    pub fn start(self: &Arc<Core>) -> Vec<std::thread::JoinHandle<()>> {
42        let mut workers = Vec::new();
43        for service in self.services.lock().unwrap().iter() {
44            workers.append(&mut service.clone().start(self.clone()));
45        }
46        trace!("core is starting {} workers", workers.len());
47        workers
48    }
49
50    /// Join workers previously returned from `start`
51    pub fn join(&self, workers: Vec<std::thread::JoinHandle<()>>) {
52        for worker in workers {
53            match worker.join() {
54                Ok(()) => {}
55                Err(err) => {
56                    trace!("thread join failure: {:?}", err);
57                }
58            }
59        }
60
61        // Drop all services and cleanup
62        self.services.lock().unwrap().clear();
63
64        trace!("... core is shut down");
65    }
66}
67
68impl Shutdown for Core {
69    fn shutdown(self: &Arc<Core>) {
70        if self.keep_running.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).is_err() {
71            return;
72        }
73
74        trace!("signaling core shutdown...");
75
76        {
77            for service in self.services.lock().unwrap().iter() {
78                let ident = service.clone().ident();
79                trace!("shutting down: {}", ident);
80                service.clone().stop();
81            }
82        }
83
84        trace!("core is shutting down...");
85    }
86}