crb_runtime/
interruptor.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
use anyhow::Error;
use derive_more::Deref;
use futures::stream::{AbortHandle, AbortRegistration};
use std::sync::{
    atomic::{AtomicBool, Ordering},
    Arc,
};
use thiserror::Error;

#[derive(Debug, Clone)]
pub struct ActiveFlag {
    flag: Arc<AtomicBool>,
}

impl Default for ActiveFlag {
    fn default() -> Self {
        Self {
            flag: Arc::new(AtomicBool::new(true)),
        }
    }
}

impl ActiveFlag {
    pub fn is_active(&self) -> bool {
        self.flag.load(Ordering::Relaxed)
    }
}

#[derive(Error, Debug)]
#[error("The registration has taken already")]
pub struct RegistrationTaken;

#[derive(Debug, Deref)]
pub struct Controller {
    pub registration: Option<AbortRegistration>,
    #[deref]
    pub interruptor: Interruptor,
}

impl Default for Controller {
    fn default() -> Self {
        let (handle, registration) = AbortHandle::new_pair();
        let interruptor = Interruptor {
            active: ActiveFlag::default(),
            handle,
        };
        Self {
            registration: Some(registration),
            interruptor,
        }
    }
}

impl Controller {
    pub fn take_registration(&mut self) -> Result<AbortRegistration, RegistrationTaken> {
        self.registration.take().ok_or(RegistrationTaken)
    }
}

#[derive(Debug, Clone, Deref)]
pub struct Interruptor {
    #[deref]
    active: ActiveFlag,
    handle: AbortHandle,
}

impl Interruptor {
    pub fn stop(&self, force: bool) -> Result<(), Error> {
        self.active.flag.store(false, Ordering::Relaxed);
        if force {
            self.handle.abort();
        }
        Ok(())
    }
}