use std::collections::{HashMap, VecDeque};
use arrayvec::ArrayVec;
use chrono::{DateTime, Duration, Utc};
use crate::prelude::{RemoteMovieId, RemoteSeriesId, SeriesId, TaskId};
const DELAY_MILLIS: i64 = 250;
const CAPACITY: usize = 50;
#[derive(Debug, Clone, Copy)]
pub(crate) enum TaskStatus {
Pending,
Running,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) enum TaskRef {
SeriesId { series_id: SeriesId },
RemoteSeriesId { remote_id: RemoteSeriesId },
RemoteMovieId { remote_id: RemoteMovieId },
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(crate) enum TaskKind {
CheckForUpdates {
series_id: SeriesId,
remote_id: RemoteSeriesId,
},
DownloadSeries {
series_id: SeriesId,
remote_id: RemoteSeriesId,
last_modified: Option<DateTime<Utc>>,
force: bool,
},
DownloadSeriesByRemoteId { remote_id: RemoteSeriesId },
#[allow(unused)]
DownloadMovieByRemoteId { remote_id: RemoteMovieId },
}
impl TaskKind {
pub(crate) fn task_ids(&self) -> ArrayVec<TaskRef, 2> {
let mut ids = ArrayVec::new();
match *self {
TaskKind::CheckForUpdates {
series_id,
remote_id,
..
} => {
ids.push(TaskRef::SeriesId { series_id });
ids.push(TaskRef::RemoteSeriesId { remote_id });
}
TaskKind::DownloadSeries {
series_id,
remote_id,
..
} => {
ids.push(TaskRef::SeriesId { series_id });
ids.push(TaskRef::RemoteSeriesId { remote_id });
}
TaskKind::DownloadSeriesByRemoteId { remote_id, .. } => {
ids.push(TaskRef::RemoteSeriesId { remote_id });
}
TaskKind::DownloadMovieByRemoteId { remote_id } => {
ids.push(TaskRef::RemoteMovieId { remote_id });
}
}
ids
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[must_use]
pub(crate) struct Task {
pub(crate) id: TaskId,
pub(crate) kind: TaskKind,
pub(crate) scheduled: Option<DateTime<Utc>>,
}
impl Task {
pub(crate) fn is_series(&self, id: &SeriesId) -> bool {
match &self.kind {
TaskKind::DownloadSeries { series_id, .. } => *series_id == *id,
TaskKind::CheckForUpdates { series_id, .. } => *series_id == *id,
TaskKind::DownloadSeriesByRemoteId { .. } => false,
TaskKind::DownloadMovieByRemoteId { .. } => false,
}
}
}
#[derive(Default)]
pub(crate) struct Queue {
status: HashMap<TaskId, TaskStatus>,
task_ids: HashMap<TaskRef, TaskId>,
data: VecDeque<Task>,
running: Vec<Task>,
modified: bool,
}
impl Queue {
#[inline]
pub(crate) fn status(&self, id: TaskRef) -> Option<TaskStatus> {
let id = self.task_ids.get(&id)?;
self.status.get(&id).copied()
}
#[inline]
pub(crate) fn complete(&mut self, task: &Task) -> Option<TaskStatus> {
self.running.retain(|t| t.id != task.id);
let status = self.status.remove(&task.id)?;
for id in task.kind.task_ids() {
let _ = self.task_ids.remove(&id);
}
Some(status)
}
#[inline]
pub(crate) fn running(&self) -> impl ExactSizeIterator<Item = &Task> {
self.running.iter()
}
#[inline]
pub(crate) fn pending(&self) -> impl ExactSizeIterator<Item = &Task> {
self.data.iter()
}
pub(crate) fn remove_tasks_by<P>(&mut self, mut predicate: P) -> usize
where
P: FnMut(&Task) -> bool,
{
let mut removed = 0;
for data in &self.data {
if predicate(data) {
let _ = self.status.remove(&data.id);
for task_id in data.kind.task_ids() {
let _ = self.task_ids.remove(&task_id);
}
removed += 1;
}
}
self.data.retain(move |task| !predicate(task));
self.modified |= removed > 0;
removed
}
#[inline]
pub(crate) fn take_modified(&mut self) -> bool {
std::mem::take(&mut self.modified)
}
pub(crate) fn next_task(
&mut self,
now: &DateTime<Utc>,
timed_out: Option<TaskId>,
) -> Option<Task> {
let task = self.data.front()?;
if !matches!(timed_out, Some(id) if id == task.id)
&& task.scheduled.map(|s| s > *now).unwrap_or_default()
{
return None;
}
let task = self.data.pop_front()?;
self.status.insert(task.id, TaskStatus::Running);
self.running.push(task.clone());
Some(task)
}
pub(crate) fn next_sleep(&self, now: &DateTime<Utc>) -> Option<(u64, TaskId)> {
let task = self.data.front()?;
let id = task.id;
let Some(scheduled) = &task.scheduled else {
return Some((0, id));
};
let seconds =
u64::try_from(scheduled.signed_duration_since(*now).num_seconds().max(0)).ok()?;
Some((seconds, id))
}
pub(crate) fn push_without_delay(&mut self, kind: TaskKind) -> bool {
let task_ids = kind.task_ids();
for task_id in &task_ids {
if self.task_ids.contains_key(task_id) {
return false;
}
}
let id = TaskId::random();
self.status.insert(id, TaskStatus::Pending);
for task_id in task_ids {
self.task_ids.insert(task_id, id);
}
self.data.push_back(Task {
id,
kind,
scheduled: None,
});
self.modified = true;
true
}
pub(crate) fn push(&mut self, kind: TaskKind) -> bool {
let task_ids = kind.task_ids();
for task_id in &task_ids {
if self.task_ids.contains_key(task_id) {
return false;
}
}
let id = TaskId::random();
for task_id in task_ids {
self.task_ids.insert(task_id, id);
}
self.status.insert(id, TaskStatus::Pending);
let scheduled = self
.data
.iter()
.flat_map(|t| t.scheduled)
.next_back()
.unwrap_or_else(Utc::now)
+ Duration::milliseconds(DELAY_MILLIS);
self.data.push_back(Task {
id,
kind,
scheduled: Some(scheduled),
});
self.modified = true;
true
}
pub(crate) fn at_soft_capacity(&self) -> bool {
self.data.len() + self.running.len() >= CAPACITY
}
}