use super::*;
use std::cmp;
use std::sync::mpsc;
pub struct RefreshScheduler<'a, T: 'a> {
rows: &'a [Mutex<TokenRow<T>>],
sender: &'a mpsc::Sender<ManagerCommand<T>>,
min_notification_interval_ms: u64,
max_cycle_dur_ms: u64,
is_running: &'a AtomicBool,
clock: &'a dyn Clock,
}
impl<'a, T: Eq + Ord + Send + Clone + Display> RefreshScheduler<'a, T> {
pub fn new(
rows: &'a [Mutex<TokenRow<T>>],
sender: &'a mpsc::Sender<ManagerCommand<T>>,
max_cycle_dur_ms: u64,
min_notification_interval_ms: u64,
is_running: &'a AtomicBool,
clock: &'a dyn Clock,
) -> Self {
RefreshScheduler {
rows,
sender,
min_notification_interval_ms,
max_cycle_dur_ms,
is_running,
clock,
}
}
pub fn start(&self) {
self.run_scheduler_loop();
}
fn run_scheduler_loop(&self) {
debug!("Starting scheduler loop");
while self.is_running.load(Ordering::Relaxed) {
let start = self.clock.now();
let next_scheduled_at = self.do_a_scheduling_round();
let elapsed = elapsed_millis_from(start, self.clock);
let sleep_dur_ms_regular = minus_millis(self.max_cycle_dur_ms, elapsed);
let sleep_next_scheduled_ms = diff_millis(self.clock.now(), next_scheduled_at);
let sleep_dur_ms = cmp::min(sleep_dur_ms_regular, sleep_next_scheduled_ms);
if sleep_dur_ms > 0 {
let sleep_dur = Duration::from_millis(sleep_dur_ms);
thread::sleep(sleep_dur);
}
}
info!("Scheduler loop exited.")
}
fn do_a_scheduling_round(&self) -> EpochMillis {
let mut next_at = u64::max_value();
let mut is_refresh_pending = false;
for (idx, row) in self.rows.iter().enumerate() {
let row = &mut *row.lock().unwrap();
if row.scheduled_for <= self.clock.now() {
is_refresh_pending = true;
row.token_state = match row.token_state {
TokenState::Uninitialized => {
if let Err(err) = self.sender
.send(ManagerCommand::ScheduledRefresh(idx, self.clock.now()))
{
error!("Could not send initial refresh command: {}", err);
break;
}
TokenState::Initializing
}
TokenState::Initializing => TokenState::Initializing,
TokenState::Ok => {
if let Err(err) = self.sender
.send(ManagerCommand::ScheduledRefresh(idx, self.clock.now()))
{
error!("Could not send regular refresh command: {}", err);
break;
}
TokenState::OkPending
}
TokenState::OkPending => TokenState::OkPending,
TokenState::Error => {
if let Err(err) = self.sender
.send(ManagerCommand::RefreshOnError(idx, self.clock.now()))
{
error!("Could not send refresh on error command: {}", err);
break;
}
TokenState::ErrorPending
}
TokenState::ErrorPending => TokenState::ErrorPending,
};
} else {
next_at = cmp::min(next_at, row.scheduled_for);
is_refresh_pending = is_refresh_pending || row.token_state.is_refresh_pending();
}
self.check_notifications(row);
}
if is_refresh_pending {
self.clock.now() + 50
} else {
next_at
}
}
fn check_notifications(&self, row: &mut TokenRow<T>) {
let now = self.clock.now();
let notify = if let Some(last_notified) = row.last_notification_at {
minus_millis(now, last_notified) >= self.min_notification_interval_ms
} else {
true
};
if notify {
let notified = match row.token_state {
TokenState::Error | TokenState::ErrorPending => {
warn!("Token '{}' is in error row.", row.token_id);
true
}
TokenState::Ok | TokenState::OkPending => {
if row.expires_at <= now {
warn!(
"Token '{}' expired {:.2} minutes ago.",
row.token_id,
(now - row.expires_at) as f64 / 60_000.0
);
true
} else if row.warn_at <= now {
warn!(
"Token '{}' expires in {:.2} minutes.",
row.token_id,
(row.expires_at - now) as f64 / 60_000.0
);
true
} else {
false
}
}
TokenState::Uninitialized | TokenState::Initializing => false,
};
if notified {
row.last_notification_at = Some(now);
}
}
}
}
#[cfg(test)]
mod test {
use super::*;
use std::cell::Cell;
use std::rc::Rc;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc;
#[derive(Clone)]
struct TestClock {
time: Rc<Cell<u64>>,
}
impl TestClock {
pub fn new() -> Self {
TestClock {
time: Rc::new(Cell::new(0)),
}
}
pub fn inc(&self, by_ms: u64) {
let past = self.time.get();
self.time.set(past + by_ms);
}
pub fn set(&self, ms: u64) {
self.time.set(ms);
}
}
impl Clock for TestClock {
fn now(&self) -> u64 {
self.time.get()
}
}
struct DummyTokenProvider;
impl AccessTokenProvider for DummyTokenProvider {
fn request_access_token(&self, _scopes: &[Scope]) -> AccessTokenProviderResult {
unimplemented!()
}
}
fn create_token_rows() -> Vec<Mutex<TokenRow<&'static str>>> {
let mut groups = Vec::default();
groups.push(
ManagedTokenGroupBuilder::single_token(
"token",
vec![Scope::new("scope")],
DummyTokenProvider,
).build()
.unwrap(),
);
create_rows(groups, 0)
}
#[test]
fn clock_test() {
let clock1 = TestClock::new();
let clock2 = clock1.clone();
clock1.inc(100);
assert_eq!(100, clock2.now());
}
#[test]
#[allow(clippy::float_cmp)]
fn initial_state_is_correct() {
let rows = create_token_rows();
let row = rows[0].lock().unwrap();
assert_eq!("token", row.token_id);
assert_eq!(vec![Scope::new("scope")], row.scopes);
assert_eq!(0.75, row.refresh_threshold);
assert_eq!(0.85, row.warning_threshold);
assert_eq!(0, row.refresh_at);
assert_eq!(0, row.warn_at);
assert_eq!(0, row.expires_at);
assert_eq!(0, row.scheduled_for);
assert_eq!(TokenState::Uninitialized, row.token_state);
assert_eq!(None, row.last_notification_at);
}
#[test]
fn scheduler_sends_initial_refresh_while_nothing_happens() {
let (tx, rx) = mpsc::channel();
let is_running = AtomicBool::new(true);
let clock = TestClock::new();
let rows = create_token_rows();
let scheduler = RefreshScheduler::new(&rows, &tx, 0, 1000, &is_running, &clock);
{
let row = rows[0].lock().unwrap();
assert_eq!(0, row.refresh_at);
assert_eq!(0, row.warn_at);
assert_eq!(0, row.expires_at);
assert_eq!(0, row.scheduled_for);
assert_eq!(TokenState::Uninitialized, row.token_state);
assert_eq!(None, row.last_notification_at);
}
clock.set(100);
scheduler.do_a_scheduling_round();
let msg = rx.recv().unwrap();
assert_eq!(ManagerCommand::ScheduledRefresh(0, 100), msg);
{
let row = rows[0].lock().unwrap();
assert_eq!(0, row.refresh_at);
assert_eq!(0, row.warn_at);
assert_eq!(0, row.expires_at);
assert_eq!(0, row.scheduled_for);
assert_eq!(TokenState::Initializing, row.token_state);
assert_eq!(None, row.last_notification_at);
}
clock.inc(1000);
scheduler.do_a_scheduling_round();
let msg = rx.try_recv();
assert_eq!(true, msg.is_err());
{
let row = rows[0].lock().unwrap();
assert_eq!(0, row.refresh_at);
assert_eq!(0, row.warn_at);
assert_eq!(0, row.expires_at);
assert_eq!(0, row.scheduled_for);
assert_eq!(TokenState::Initializing, row.token_state);
assert_eq!(None, row.last_notification_at);
}
}
#[test]
#[allow(clippy::cognitive_complexity)]
fn scheduler_workflow() {
let (tx, rx) = mpsc::channel();
let is_running = AtomicBool::new(true);
let clock = TestClock::new();
let rows = create_token_rows();
let scheduler = RefreshScheduler::new(&rows, &tx, 0, 1000, &is_running, &clock);
{
let row = rows[0].lock().unwrap();
assert_eq!(0, row.refresh_at);
assert_eq!(0, row.warn_at);
assert_eq!(0, row.expires_at);
assert_eq!(0, row.scheduled_for);
assert_eq!(TokenState::Uninitialized, row.token_state);
assert_eq!(None, row.last_notification_at);
}
clock.set(100);
scheduler.do_a_scheduling_round();
let msg = rx.try_recv().unwrap();
assert_eq!(ManagerCommand::ScheduledRefresh(0, 100), msg);
{
let row = rows[0].lock().unwrap();
assert_eq!(0, row.refresh_at);
assert_eq!(0, row.warn_at);
assert_eq!(0, row.expires_at);
assert_eq!(0, row.scheduled_for);
assert_eq!(TokenState::Initializing, row.token_state);
assert_eq!(None, row.last_notification_at);
}
clock.set(1000);
{
let mut row = rows[0].lock().unwrap();
row.refresh_at = clock.now() + 7500;
row.warn_at = clock.now() + 8500;
row.expires_at = clock.now() + 10000;
row.scheduled_for = clock.now() + 7500;
row.token_state = TokenState::Ok;
}
scheduler.do_a_scheduling_round();
let msg = rx.try_recv();
assert_eq!(true, msg.is_err());
{
let row = rows[0].lock().unwrap();
assert_eq!(8500, row.refresh_at);
assert_eq!(9500, row.warn_at);
assert_eq!(11000, row.expires_at);
assert_eq!(8500, row.scheduled_for);
assert_eq!(TokenState::Ok, row.token_state);
assert_eq!(None, row.last_notification_at);
}
clock.set(1001);
scheduler.do_a_scheduling_round();
let msg = rx.try_recv();
assert_eq!(true, msg.is_err());
{
let row = rows[0].lock().unwrap();
assert_eq!(8500, row.refresh_at);
assert_eq!(9500, row.warn_at);
assert_eq!(11000, row.expires_at);
assert_eq!(8500, row.scheduled_for);
assert_eq!(TokenState::Ok, row.token_state);
assert_eq!(None, row.last_notification_at);
}
clock.set(8499);
scheduler.do_a_scheduling_round();
let msg = rx.try_recv();
assert_eq!(true, msg.is_err());
{
let row = rows[0].lock().unwrap();
assert_eq!(8500, row.refresh_at);
assert_eq!(9500, row.warn_at);
assert_eq!(11000, row.expires_at);
assert_eq!(8500, row.scheduled_for);
assert_eq!(TokenState::Ok, row.token_state);
assert_eq!(None, row.last_notification_at);
}
clock.set(8500);
scheduler.do_a_scheduling_round();
let msg = rx.try_recv().unwrap();
assert_eq!(ManagerCommand::ScheduledRefresh(0, 8500), msg);
{
let row = rows[0].lock().unwrap();
assert_eq!(8500, row.refresh_at);
assert_eq!(9500, row.warn_at);
assert_eq!(11000, row.expires_at);
assert_eq!(8500, row.scheduled_for);
assert_eq!(TokenState::OkPending, row.token_state);
assert_eq!(None, row.last_notification_at);
}
clock.set(9499);
scheduler.do_a_scheduling_round();
let msg = rx.try_recv();
assert_eq!(true, msg.is_err());
{
let row = rows[0].lock().unwrap();
assert_eq!(8500, row.refresh_at);
assert_eq!(9500, row.warn_at);
assert_eq!(11000, row.expires_at);
assert_eq!(8500, row.scheduled_for);
assert_eq!(TokenState::OkPending, row.token_state);
assert_eq!(None, row.last_notification_at);
}
clock.set(9500);
scheduler.do_a_scheduling_round();
let msg = rx.try_recv();
assert_eq!(true, msg.is_err());
{
let row = rows[0].lock().unwrap();
assert_eq!(8500, row.refresh_at);
assert_eq!(9500, row.warn_at);
assert_eq!(11000, row.expires_at);
assert_eq!(8500, row.scheduled_for);
assert_eq!(TokenState::OkPending, row.token_state);
assert_eq!(Some(9500), row.last_notification_at);
}
clock.set(10499);
scheduler.do_a_scheduling_round();
let msg = rx.try_recv();
assert_eq!(true, msg.is_err());
{
let row = rows[0].lock().unwrap();
assert_eq!(8500, row.refresh_at);
assert_eq!(9500, row.warn_at);
assert_eq!(11000, row.expires_at);
assert_eq!(8500, row.scheduled_for);
assert_eq!(TokenState::OkPending, row.token_state);
assert_eq!(Some(9500), row.last_notification_at);
}
clock.set(10500);
scheduler.do_a_scheduling_round();
let msg = rx.try_recv();
assert_eq!(true, msg.is_err());
{
let row = rows[0].lock().unwrap();
assert_eq!(8500, row.refresh_at);
assert_eq!(9500, row.warn_at);
assert_eq!(11000, row.expires_at);
assert_eq!(8500, row.scheduled_for);
assert_eq!(TokenState::OkPending, row.token_state);
assert_eq!(Some(10500), row.last_notification_at);
}
clock.set(10600);
{
let mut row = rows[0].lock().unwrap();
row.refresh_at = clock.now() + 7500;
row.warn_at = clock.now() + 8500;
row.expires_at = clock.now() + 10000;
row.scheduled_for = clock.now() + 7500;
row.token_state = TokenState::Ok;
}
scheduler.do_a_scheduling_round();
let msg = rx.try_recv();
assert_eq!(true, msg.is_err());
{
let row = rows[0].lock().unwrap();
assert_eq!(10600 + 7500, row.refresh_at);
assert_eq!(10600 + 8500, row.warn_at);
assert_eq!(10600 + 10000, row.expires_at);
assert_eq!(10600 + 7500, row.scheduled_for);
assert_eq!(TokenState::Ok, row.token_state);
assert_eq!(Some(10500), row.last_notification_at);
}
clock.set(18100);
scheduler.do_a_scheduling_round();
let msg = rx.try_recv().unwrap();
assert_eq!(ManagerCommand::ScheduledRefresh(0, 18100), msg);
{
let row = rows[0].lock().unwrap();
assert_eq!(10600 + 7500, row.refresh_at);
assert_eq!(10600 + 8500, row.warn_at);
assert_eq!(10600 + 10000, row.expires_at);
assert_eq!(10600 + 7500, row.scheduled_for);
assert_eq!(TokenState::OkPending, row.token_state);
assert_eq!(Some(10500), row.last_notification_at);
}
clock.set(20000);
{
let mut row = rows[0].lock().unwrap();
row.refresh_at = clock.now();
row.warn_at = clock.now();
row.expires_at = clock.now();
row.scheduled_for = clock.now() + 100;
row.token_state = TokenState::Error;
}
scheduler.do_a_scheduling_round();
let msg = rx.try_recv();
assert_eq!(true, msg.is_err());
{
let row = rows[0].lock().unwrap();
assert_eq!(20000, row.refresh_at);
assert_eq!(20000, row.warn_at);
assert_eq!(20000, row.expires_at);
assert_eq!(20100, row.scheduled_for);
assert_eq!(TokenState::Error, row.token_state);
assert_eq!(Some(20000), row.last_notification_at);
}
clock.set(20100);
scheduler.do_a_scheduling_round();
let msg = rx.try_recv().unwrap();
assert_eq!(ManagerCommand::RefreshOnError(0, 20100), msg);
{
let row = rows[0].lock().unwrap();
assert_eq!(20000, row.refresh_at);
assert_eq!(20000, row.warn_at);
assert_eq!(20000, row.expires_at);
assert_eq!(20100, row.scheduled_for);
assert_eq!(TokenState::ErrorPending, row.token_state);
assert_eq!(Some(20000), row.last_notification_at);
}
clock.set(21000);
{
let mut row = rows[0].lock().unwrap();
row.refresh_at = clock.now() + 7500;
row.warn_at = clock.now() + 8500;
row.expires_at = clock.now() + 10000;
row.scheduled_for = clock.now() + 7500;
row.token_state = TokenState::Ok;
}
scheduler.do_a_scheduling_round();
let msg = rx.try_recv();
assert_eq!(true, msg.is_err());
{
let row = rows[0].lock().unwrap();
assert_eq!(28500, row.refresh_at);
assert_eq!(29500, row.warn_at);
assert_eq!(31000, row.expires_at);
assert_eq!(28500, row.scheduled_for);
assert_eq!(TokenState::Ok, row.token_state);
assert_eq!(Some(20000), row.last_notification_at);
}
clock.set(28500);
scheduler.do_a_scheduling_round();
let msg = rx.try_recv().unwrap();
assert_eq!(ManagerCommand::ScheduledRefresh(0, 28500), msg);
{
let row = rows[0].lock().unwrap();
assert_eq!(28500, row.refresh_at);
assert_eq!(29500, row.warn_at);
assert_eq!(31000, row.expires_at);
assert_eq!(28500, row.scheduled_for);
assert_eq!(TokenState::OkPending, row.token_state);
assert_eq!(Some(20000), row.last_notification_at);
}
}
}