use std::collections::BTreeMap;
use time::{Duration, OffsetDateTime};
use uuid::Uuid;
use super::SchedulerError;
use crate::db;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Config {
pub min_fetch_interval: Duration,
pub max_fetch_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
min_fetch_interval: Duration::minutes(5),
max_fetch_interval: Duration::hours(12),
}
}
}
#[derive(Debug)]
pub(crate) struct Calender {
calendar: BTreeMap<OffsetDateTime, Vec<Uuid>>,
config: Config,
}
impl Calender {
pub fn new(config: Config) -> Self {
Self {
calendar: BTreeMap::new(),
config,
}
}
pub(crate) fn next_time(&self) -> Option<OffsetDateTime> {
self.calendar.keys().next().copied()
}
#[tracing::instrument(skip(self), level = tracing::Level::TRACE)]
pub fn pop_due(&mut self, until_time: OffsetDateTime) -> Vec<Uuid> {
let mut due_tasks = vec![];
let keys_to_remove = self
.calendar
.range(..=until_time)
.map(|(k, _)| *k)
.collect::<Vec<OffsetDateTime>>();
for key in keys_to_remove {
if let Some(mut tasks) = self.calendar.remove(&key) {
due_tasks.append(&mut tasks);
}
}
due_tasks
}
#[tracing::instrument(skip(self, rng), level = tracing::Level::DEBUG)]
pub(crate) async fn schedule_source(
&mut self,
source: Uuid,
rng: &mut impl rand::Rng,
) -> Result<(), db::StorageError> {
use rand::RngExt as _;
let now = time::OffsetDateTime::now_utc();
let time_window = self
.config
.max_fetch_interval
.saturating_sub(self.config.min_fetch_interval)
.whole_milliseconds();
let random_offset = if time_window > 0 {
rng.random_range(0..=time_window)
} else {
0
};
let scheduled_time = now + self.config.min_fetch_interval + Duration::milliseconds(random_offset.try_into()?);
self.calendar.entry(scheduled_time).or_default().push(source);
Ok(())
}
#[tracing::instrument(skip(self), level = tracing::Level::DEBUG)]
pub(crate) fn unschedule_fetch(&mut self, source: Uuid) -> Result<(), SchedulerError> {
let entry = self
.calendar
.iter()
.find_map(|entry| entry.1.iter().position(|x| x == &source).map(|index| (*entry.0, index)))
.ok_or(SchedulerError::SourceNotScheduled(source))?;
self.calendar
.get_mut(&entry.0)
.ok_or(SchedulerError::SourceNotScheduled(source))?
.remove(entry.1);
Ok(())
}
}