ump-ng-server 0.4.1

Server message dispatch loop for ump-ng.
Documentation
use std::{
  sync::Arc,
  thread::JoinHandle,
  time::{Duration, Instant}
};

use parking_lot::{Condvar, Mutex};

/// Maximum amount of milliseconds allowed
const MAX_PROC_MILLIS: u64 = 200;

enum State {
  /// Waiting for a message to arrive.
  Idle,

  /// A message has arrived and is being processed.
  Processing {
    start_time: Instant
  },

  /// Message processing has timed out.
  Timeout,

  Term
}

struct Inner {
  state: State
}

struct Shared {
  inner: Mutex<Inner>,
  signal: Condvar
}

pub fn run() -> WatchDog {
  let inner = Inner { state: State::Idle };

  let shared = Shared {
    inner: Mutex::new(inner),
    signal: Condvar::new()
  };

  let sh = Arc::new(shared);
  let shared = Arc::clone(&sh);
  let jh = std::thread::spawn(move || monitor_thread(&shared));

  WatchDog { sh, jh }
}

pub struct WatchDog {
  sh: Arc<Shared>,
  jh: JoinHandle<()>
}

impl WatchDog {
  pub(crate) fn begin_process(&self) {
    let mut g = self.sh.inner.lock();
    g.state = State::Processing {
      start_time: Instant::now()
    };
    self.sh.signal.notify_one();
    drop(g);
  }

  pub(crate) fn end_process(&self) {
    let mut g = self.sh.inner.lock();
    g.state = State::Idle;
    self.sh.signal.notify_one();
    drop(g);
  }

  pub(crate) fn kill(self) -> std::thread::Result<()> {
    let mut g = self.sh.inner.lock();
    g.state = State::Term;
    self.sh.signal.notify_one();
    drop(g);
    self.jh.join()
  }
}


#[allow(clippy::significant_drop_tightening)]
fn monitor_thread(sh: &Arc<Shared>) {
  let mut g = sh.inner.lock();
  loop {
    match g.state {
      State::Idle => {
        // Wait to be notified about a state change
        sh.signal.wait(&mut g);
      }
      State::Processing { start_time } => {
        let timeout = start_time + Duration::from_millis(MAX_PROC_MILLIS);
        if sh.signal.wait_until(&mut g, timeout).timed_out() {
          g.state = State::Timeout;

          // This is commented out just to make clippy happy.  The `continue`
          // really really belongs here.
          //continue;
        }
      }
      State::Timeout => {
        #[cfg(feature = "tracing")]
        tracing::warn!(
          "Message processing held up the dispatcher more than {}ms",
          MAX_PROC_MILLIS
        );

        #[cfg(not(feature = "tracing"))]
        eprintln!(
          "Warning: Message processing held up the dispatcher more than {}ms",
          MAX_PROC_MILLIS
        );

        // Retutn to idle state
        g.state = State::Idle;

        // This is commented out just to make clippy happy.  The `continue`
        // really really belongs here.
        //continue;
      }
      State::Term => {
        break;
      }
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :