1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use crate::{progress, unit};
use std::collections::VecDeque;
use std::sync::atomic::Ordering;
use std::time::{Duration, SystemTime};

const THROTTLE_INTERVAL: Duration = Duration::from_secs(1);
const ONCE_A_SECOND: Duration = Duration::from_secs(1);

#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Debug)]
struct State {
    observed: Duration,
    last_value: progress::Step,
    elapsed_values: VecDeque<(Duration, progress::Step)>,

    last_update_duration: Duration,
    precomputed_throughput: Option<progress::Step>,
}

impl State {
    fn new(value: progress::Step, elapsed: Duration) -> Self {
        State {
            observed: elapsed,
            last_value: value,
            elapsed_values: {
                let mut v = VecDeque::with_capacity(6); // default frames per second
                v.push_back((elapsed, value));
                v
            },

            last_update_duration: elapsed,
            precomputed_throughput: None,
        }
    }

    fn compute_throughput(&mut self) -> progress::Step {
        let mut observed: Duration = self.elapsed_values.iter().map(|e| e.0).sum();
        while !self.elapsed_values.is_empty() && observed > ONCE_A_SECOND {
            let candidate = self
                .elapsed_values
                .front()
                .map(|e| e.0)
                .expect("at least one item as we are in the checked loop");
            if observed.checked_sub(candidate).unwrap_or_default() <= ONCE_A_SECOND {
                break;
            }
            observed -= candidate;
            self.elapsed_values.pop_front();
        }
        let observed_value: progress::Step = self.elapsed_values.iter().map(|e| e.1).sum();
        ((observed_value as f64 / observed.as_secs_f64()) * ONCE_A_SECOND.as_secs_f64()) as progress::Step
    }

    fn update(&mut self, value: progress::Step, elapsed: Duration) -> Option<unit::display::Throughput> {
        self.observed += elapsed;
        self.elapsed_values
            .push_back((elapsed, value.saturating_sub(self.last_value)));
        self.last_value = value;
        if self.observed - self.last_update_duration > THROTTLE_INTERVAL {
            self.precomputed_throughput = Some(self.compute_throughput());
            self.last_update_duration = self.observed;
        }
        self.throughput()
    }

    fn throughput(&self) -> Option<unit::display::Throughput> {
        self.precomputed_throughput.map(|tp| unit::display::Throughput {
            value_change_in_timespan: tp,
            timespan: ONCE_A_SECOND,
        })
    }
}

/// A utility to compute throughput of a set of progress values usually available to a renderer.
#[derive(Default)]
pub struct Throughput {
    sorted_by_key: Vec<(progress::Key, State)>,
    updated_at: Option<SystemTime>,
    elapsed: Option<Duration>,
}

impl Throughput {
    /// Called at the beginning of the drawing of a renderer to remember at which time progress values are
    /// going to be updated with [`update_and_get(…)`][Throughput::update_and_get()].
    pub fn update_elapsed(&mut self) {
        let now = SystemTime::now();
        self.elapsed = self.updated_at.and_then(|then| now.duration_since(then).ok());
        self.updated_at = Some(now);
    }

    /// Lookup or create the progress value at `key` and set its current `progress`, returning its computed
    /// throughput.
    pub fn update_and_get(
        &mut self,
        key: &progress::Key,
        progress: Option<&progress::Value>,
    ) -> Option<unit::display::Throughput> {
        progress.and_then(|progress| {
            self.elapsed
                .and_then(|elapsed| match self.sorted_by_key.binary_search_by_key(key, |t| t.0) {
                    Ok(index) => self.sorted_by_key[index]
                        .1
                        .update(progress.step.load(Ordering::SeqCst), elapsed),
                    Err(index) => {
                        let state = State::new(progress.step.load(Ordering::SeqCst), elapsed);
                        let tp = state.throughput();
                        self.sorted_by_key.insert(index, (*key, state));
                        tp
                    }
                })
        })
    }

    /// Compare the keys in `sorted_values` with our internal state and remove all missing tasks from it.
    ///
    /// This should be called after [`update_and_get(…)`][Throughput::update_and_get()] to pick up removed/finished
    /// progress.
    pub fn reconcile(&mut self, sorted_values: &[(progress::Key, progress::Task)]) {
        self.sorted_by_key
            .retain(|(key, _)| sorted_values.binary_search_by_key(key, |e| e.0).is_ok());
    }
}