crb_runtime/
controller.rs1use crate::interruptor::{InterruptionLevel, Interruptor};
2use derive_more::Deref;
3use futures::stream::{AbortHandle, AbortRegistration};
4use std::sync::{
5 Arc,
6 atomic::{AtomicBool, Ordering},
7};
8use thiserror::Error;
9
10#[derive(Error, Debug)]
11#[error("The registration has taken already")]
12pub struct RegistrationTaken;
13
14#[derive(Debug, Deref)]
15pub struct Controller {
16 pub registration: Option<AbortRegistration>,
17 #[deref]
18 pub stopper: Stopper,
19}
20
21impl Default for Controller {
22 fn default() -> Self {
23 let (handle, registration) = AbortHandle::new_pair();
24 let stopper = Stopper {
25 active: ActiveFlag::default(),
26 handle,
27 };
28 Self {
29 registration: Some(registration),
30 stopper,
31 }
32 }
33}
34
35impl Controller {
36 pub fn take_registration(&mut self) -> Result<AbortRegistration, RegistrationTaken> {
37 self.registration.take().ok_or(RegistrationTaken)
38 }
39}
40
41#[derive(Debug, Clone, Deref)]
42pub struct Stopper {
43 #[deref]
44 active: ActiveFlag,
45 handle: AbortHandle,
46}
47
48impl Stopper {
49 pub fn stop(&self, force: bool) {
50 self.active.flag.store(false, Ordering::Relaxed);
51 if force {
52 self.handle.abort();
53 }
54 }
55}
56
57impl Interruptor for Stopper {
58 fn interrupt(&self) {
59 self.stop(false);
60 }
61
62 fn interrupt_with_level(&self, level: InterruptionLevel) {
63 let force = level > InterruptionLevel::FLAG;
64 self.stop(force);
65 }
66}
67
68#[derive(Debug, Clone)]
69pub struct ActiveFlag {
70 flag: Arc<AtomicBool>,
71}
72
73impl Default for ActiveFlag {
74 fn default() -> Self {
75 Self {
76 flag: Arc::new(AtomicBool::new(true)),
77 }
78 }
79}
80
81impl ActiveFlag {
82 pub fn is_active(&self) -> bool {
83 self.flag.load(Ordering::Relaxed)
84 }
85}