use std::collections::{BTreeMap, BTreeSet};
use crate::entities::error::AtexError;
use crate::entities::task::{NewRepoTask, Task, TaskId, TaskStatus};
use crate::proto::ITaskRepo;
use crate::utils::now;
#[derive(Default)]
pub struct InMemRepo {
tasks: BTreeMap<TaskId, Task>,
in_progress_index: BTreeSet<TaskId>,
completed_index: BTreeSet<TaskId>,
err_index: BTreeSet<TaskId>,
lock_index: BTreeMap<String, TaskId>,
last_id: TaskId,
}
impl InMemRepo {
fn next_id(&mut self) -> TaskId {
loop {
let id = self.inc_last_id();
if self.tasks.get(&id).is_none() {
return id;
}
}
}
fn inc_last_id(&mut self) -> TaskId {
let id = if self.last_id == TaskId::MAX {
0
} else {
self.last_id + 1
};
self.last_id = id;
id
}
}
impl ITaskRepo for InMemRepo {
fn init(&mut self) -> Result<(), AtexError> {
Ok(())
}
fn create(&mut self, task: NewRepoTask) -> Result<TaskId, AtexError> {
if let Some(id) = self.lock_index.get(&task.lock) {
return Ok(*id);
}
let id = self.next_id();
let prev = self.tasks.insert(
id,
Task {
id,
periodic_interval: task.periodic_interval,
executor: task.executor,
status: TaskStatus::InProgress(0),
payload: task.payload,
created: task.created,
updated: task.updated,
execute_after: task.execute_after,
lock: task.lock.clone(),
},
);
self.in_progress_index.insert(id);
self.lock_index.insert(task.lock, id);
if prev.is_some() {
panic!("Insert conflict")
}
Ok(id)
}
fn get_by_id(&self, id: TaskId) -> Result<Option<Task>, AtexError> {
Ok(self.tasks.get(&id).cloned())
}
fn get_by_executor(&self, executor: &str) -> Result<Option<Task>, AtexError> {
for (_, task) in self.tasks.iter() {
if task.executor == executor {
return Ok(Some(task.clone()));
}
}
Ok(None)
}
fn get_ready(&self, exclude: &[TaskId], limit: usize) -> Result<Vec<Task>, AtexError> {
let mut result = Vec::new();
let now_ts = now();
for key in self.in_progress_index.iter() {
if exclude.contains(key) {
continue;
}
let task = match self.tasks.get(key) {
Some(val) => val,
_ => panic!("Index in_progress failed(not found)"),
};
if !matches!(task.status, TaskStatus::InProgress(_)) {
panic!("Index in_progress failed(status missmatch)")
}
if task.execute_after > now_ts {
continue;
}
result.push(task.clone());
if result.len() >= limit {
break;
}
}
Ok(result)
}
fn set_status(&mut self, id: TaskId, task_status: TaskStatus) -> Result<(), AtexError> {
let (to_put, to_del1, to_del2, save_lock) = match task_status {
TaskStatus::Error(_) => (
&mut self.err_index,
&mut self.in_progress_index,
&mut self.completed_index,
false,
),
TaskStatus::InProgress(_) => (
&mut self.in_progress_index,
&mut self.err_index,
&mut self.completed_index,
true,
),
TaskStatus::Completed => (
&mut self.completed_index,
&mut self.in_progress_index,
&mut self.err_index,
false,
),
};
let task = match self.tasks.get_mut(&id) {
Some(val) => val,
_ => return Ok(()),
};
task.status = task_status;
task.updated = now();
to_put.insert(id);
to_del1.remove(&id);
to_del2.remove(&id);
if save_lock {
self.lock_index.insert(task.lock.clone(), id);
} else {
self.lock_index.remove(&task.lock);
}
Ok(())
}
fn set_progress(&mut self, id: TaskId, progress: u8) -> Result<(), AtexError> {
self.set_status(id, TaskStatus::InProgress(progress.min(99)))
}
fn set_execute_after(&mut self, id: TaskId, ts: u64) -> Result<(), AtexError> {
if let Some(task) = self.tasks.get_mut(&id) {
task.execute_after = ts;
task.status = TaskStatus::InProgress(0);
task.updated = now();
}
Ok(())
}
fn delete_completed_tasks(&mut self, max_lifetime: u64) -> Result<(), AtexError> {
delete_tasks(
&mut self.tasks,
&mut self.completed_index,
|s| matches!(s, TaskStatus::Completed),
max_lifetime,
"completed",
);
Ok(())
}
fn delete_error_tasks(&mut self, max_lifetime: u64) -> Result<(), AtexError> {
delete_tasks(
&mut self.tasks,
&mut self.err_index,
|s| matches!(s, TaskStatus::Error(_)),
max_lifetime,
"err",
);
Ok(())
}
}
#[inline(always)]
fn delete_tasks<F: Fn(&TaskStatus) -> bool>(
tasks: &mut BTreeMap<TaskId, Task>,
index: &mut BTreeSet<TaskId>,
status_check: F,
max_lifetime: u64,
index_name: &str,
) {
let mut id_list = Vec::new();
let threshold = now() - max_lifetime;
for id in index.iter() {
let task = match tasks.get(id) {
Some(val) => val,
_ => panic!("Index {} failed(not found)", index_name),
};
if !status_check(&task.status) {
panic!("Index {} failed(status missmatch)", index_name)
}
if task.updated < threshold {
id_list.push(*id);
}
}
for id in id_list {
tasks.remove(&id);
index.remove(&id);
}
}