1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
use std::{
sync::{atomic::Ordering, Arc},
thread::{self, JoinHandle},
time::{Duration, Instant}
};
use atomic_time::AtomicInstant;
use parking_lot::{Condvar, Mutex};
use crate::cmdsig::CmdSignal;
pub(crate) struct Inner {
term: bool
}
pub(crate) struct Shared {
inner: Mutex<Inner>,
signal: Condvar,
/// The amount of time that must pass without receiving a packet in order to
/// trigger the idle callback.
dur: Duration,
last_pkt_inst: AtomicInstant,
cmdsig: CmdSignal
}
impl Shared {
pub(crate) fn new(dur: Duration, cmdsig: CmdSignal) -> Self {
let inner = Inner { term: false };
Shared {
inner: Mutex::new(inner),
signal: Condvar::new(),
dur,
last_pkt_inst: AtomicInstant::now(),
cmdsig
}
}
/// Update the last packet timestamp.
#[inline]
pub(crate) fn touch(&self) {
self.last_pkt_inst.store(Instant::now(), Ordering::Release);
}
/// Terminate the idle monitoring thread.
pub(crate) fn kill(&self) {
let mut g = self.inner.lock();
g.term = true;
self.signal.notify_one();
}
}
pub(crate) fn run(
idle_dur: Duration,
cmdsig: CmdSignal
) -> (Arc<Shared>, JoinHandle<()>) {
let sh = Shared::new(idle_dur, cmdsig);
let sh = Arc::new(sh);
let sh2 = Arc::clone(&sh);
let jh = thread::spawn(move || idle_thread(sh2));
(sh, jh)
}
/// Keep track of when a specified amount of time has passed since last packet
/// was received.
///
/// This is basically a debouncer for calling `idle()` when the sender has
/// calmed down.
///
/// If the timeout is reached, tell the receiver thread to call the `idle()`
/// callback.
pub(crate) fn idle_thread(sh: Arc<Shared>) {
let mut g = sh.inner.lock();
if g.term {
return;
}
let mut last_run = Instant::now();
// Calculate when the idle state should trigger
let mut at = sh.last_pkt_inst.load(Ordering::Acquire) + sh.dur;
loop {
let res = sh.signal.wait_until(&mut g, at);
if g.term {
//println!("Requested shutdown of idle monitoring thread");
return;
}
let last_pkt_inst = sh.last_pkt_inst.load(Ordering::Acquire);
if res.timed_out() {
if last_pkt_inst == last_run {
// A timeout occurred, but no new packet has arrived. Just recalculate
// `at` and redo from start.
at = Instant::now() + sh.dur;
} else {
// A timeout occurred, and the callback has not been called since last
// lacket.
// Tell the receiver thread that it should call the idle callback
sh.cmdsig.idle();
last_run = last_pkt_inst;
at = last_pkt_inst + sh.dur;
}
} else {
// Time hasn't expired -- recalculate the new `at`
at = Instant::now() + sh.dur;
}
}
}
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :