crb_runtime/
controller.rs

1use crate::interruptor::{InterruptionLevel, Interruptor};
2use derive_more::Deref;
3use futures::stream::{AbortHandle, AbortRegistration};
4use std::sync::{
5    Arc,
6    atomic::{AtomicBool, Ordering},
7};
8use thiserror::Error;
9
10#[derive(Error, Debug)]
11#[error("The registration has taken already")]
12pub struct RegistrationTaken;
13
14#[derive(Debug, Deref)]
15pub struct Controller {
16    pub registration: Option<AbortRegistration>,
17    #[deref]
18    pub stopper: Stopper,
19}
20
21impl Default for Controller {
22    fn default() -> Self {
23        let (handle, registration) = AbortHandle::new_pair();
24        let stopper = Stopper {
25            active: ActiveFlag::default(),
26            handle,
27        };
28        Self {
29            registration: Some(registration),
30            stopper,
31        }
32    }
33}
34
35impl Controller {
36    pub fn take_registration(&mut self) -> Result<AbortRegistration, RegistrationTaken> {
37        self.registration.take().ok_or(RegistrationTaken)
38    }
39}
40
41#[derive(Debug, Clone, Deref)]
42pub struct Stopper {
43    #[deref]
44    active: ActiveFlag,
45    handle: AbortHandle,
46}
47
48impl Stopper {
49    pub fn stop(&self, force: bool) {
50        self.active.flag.store(false, Ordering::Relaxed);
51        if force {
52            self.handle.abort();
53        }
54    }
55}
56
57impl Interruptor for Stopper {
58    fn interrupt(&self) {
59        self.stop(false);
60    }
61
62    fn interrupt_with_level(&self, level: InterruptionLevel) {
63        let force = level > InterruptionLevel::FLAG;
64        self.stop(force);
65    }
66}
67
68#[derive(Debug, Clone)]
69pub struct ActiveFlag {
70    flag: Arc<AtomicBool>,
71}
72
73impl Default for ActiveFlag {
74    fn default() -> Self {
75        Self {
76            flag: Arc::new(AtomicBool::new(true)),
77        }
78    }
79}
80
81impl ActiveFlag {
82    pub fn is_active(&self) -> bool {
83        self.flag.load(Ordering::Relaxed)
84    }
85}