sbp_settings/
context.rs

1use std::sync::{
2    atomic::{AtomicUsize, Ordering},
3    Arc,
4};
5use std::time::{Duration, Instant};
6
7use crossbeam_channel::{Receiver, Sender};
8
9pub struct Context {
10    pub(crate) cancel_rx: Receiver<()>,
11    pub(crate) timeout_rx: Receiver<Instant>,
12    timeout_at: Instant,
13    timeout_duration: Duration,
14    shared: Arc<Shared>,
15}
16
17impl Context {
18    pub fn new() -> (Context, CtxHandle) {
19        let (cancel_tx, cancel_rx) = crossbeam_channel::unbounded();
20        let timeout_rx = crossbeam_channel::never();
21        let timeout_duration = Duration::new(3_154_000_000, 0); // a century;
22        let timeout_at = Instant::now() + timeout_duration;
23        let shared = Arc::new(Shared {
24            cancel_tx,
25            num_chans: AtomicUsize::new(1),
26        });
27        (
28            Context {
29                cancel_rx,
30                timeout_rx,
31                timeout_at,
32                timeout_duration,
33                shared: Arc::clone(&shared),
34            },
35            CtxHandle { shared },
36        )
37    }
38
39    pub fn with_timeout(timeout: Duration) -> (Context, CtxHandle) {
40        let (mut ctx, h) = Self::new();
41        ctx.set_timeout(timeout);
42        (ctx, h)
43    }
44
45    pub fn set_timeout(&mut self, timeout: Duration) {
46        self.timeout_at = Instant::now() + timeout;
47        self.timeout_rx = crossbeam_channel::at(self.timeout_at);
48    }
49
50    pub fn reset_timeout(&mut self) {
51        self.set_timeout(self.timeout_duration);
52    }
53
54    pub fn cancel(&self) {
55        self.shared.cancel();
56    }
57}
58
59impl Clone for Context {
60    fn clone(&self) -> Self {
61        self.shared.num_chans.fetch_add(1, Ordering::SeqCst);
62        Self {
63            cancel_rx: self.cancel_rx.clone(),
64            timeout_rx: crossbeam_channel::at(self.timeout_at),
65            timeout_at: self.timeout_at,
66            timeout_duration: self.timeout_duration,
67            shared: Arc::clone(&self.shared),
68        }
69    }
70}
71
72pub struct CtxHandle {
73    shared: Arc<Shared>,
74}
75
76impl CtxHandle {
77    pub fn cancel(&self) {
78        self.shared.cancel();
79    }
80}
81
82struct Shared {
83    cancel_tx: Sender<()>,
84    num_chans: AtomicUsize,
85}
86
87impl Shared {
88    fn cancel(&self) {
89        for _ in 0..self.num_chans.load(Ordering::SeqCst) {
90            let _ = self.cancel_tx.try_send(());
91        }
92    }
93}