1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use std::{
  sync::{atomic::Ordering, Arc},
  thread::{self, JoinHandle},
  time::{Duration, Instant}
};

use atomic_time::AtomicInstant;

use parking_lot::{Condvar, Mutex};

use crate::cmdsig::CmdSignal;


pub(crate) struct Inner {
  term: bool
}

pub(crate) struct Shared {
  inner: Mutex<Inner>,
  signal: Condvar,

  /// The amount of time that must pass without receiving a packet in order to
  /// trigger the idle callback.
  dur: Duration,

  last_pkt_inst: AtomicInstant,

  cmdsig: CmdSignal
}


impl Shared {
  pub(crate) fn new(dur: Duration, cmdsig: CmdSignal) -> Self {
    let inner = Inner { term: false };
    Shared {
      inner: Mutex::new(inner),
      signal: Condvar::new(),
      dur,
      last_pkt_inst: AtomicInstant::now(),
      cmdsig
    }
  }

  /// Update the last packet timestamp.
  #[inline]
  pub(crate) fn touch(&self) {
    self.last_pkt_inst.store(Instant::now(), Ordering::Release);
  }

  /// Terminate the idle monitoring thread.
  pub(crate) fn kill(&self) {
    let mut g = self.inner.lock();
    g.term = true;
    self.signal.notify_one();
  }
}

pub(crate) fn run(
  idle_dur: Duration,
  cmdsig: CmdSignal
) -> (Arc<Shared>, JoinHandle<()>) {
  let sh = Shared::new(idle_dur, cmdsig);
  let sh = Arc::new(sh);

  let sh2 = Arc::clone(&sh);
  let jh = thread::spawn(move || idle_thread(sh2));

  (sh, jh)
}


/// Keep track of when a specified amount of time has passed since last packet
/// was received.
///
/// This is basically a debouncer for calling `idle()` when the sender has
/// calmed down.
///
/// If the timeout is reached, tell the receiver thread to call the `idle()`
/// callback.
pub(crate) fn idle_thread(sh: Arc<Shared>) {
  let mut g = sh.inner.lock();
  if g.term {
    return;
  }
  let mut last_run = Instant::now();

  // Calculate when the idle state should trigger
  let mut at = sh.last_pkt_inst.load(Ordering::Acquire) + sh.dur;
  loop {
    let res = sh.signal.wait_until(&mut g, at);
    if g.term {
      //println!("Requested shutdown of idle monitoring thread");
      return;
    }
    let last_pkt_inst = sh.last_pkt_inst.load(Ordering::Acquire);
    if res.timed_out() {
      if last_pkt_inst == last_run {
        // A timeout occurred, but no new packet has arrived.  Just recalculate
        // `at` and redo from start.
        at = Instant::now() + sh.dur;
      } else {
        // A timeout occurred, and the callback has not been called since last
        // lacket.

        // Tell the receiver thread that it should call the idle callback
        sh.cmdsig.idle();

        last_run = last_pkt_inst;

        at = last_pkt_inst + sh.dur;
      }
    } else {
      // Time hasn't expired -- recalculate the new `at`
      at = Instant::now() + sh.dur;
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :