atex 0.3.0

Lib for async local task evaluation(sqlite or in-memory)
Documentation
use std::collections::{BTreeMap, BTreeSet};

use crate::entities::error::AtexError;
use crate::entities::task::{NewRepoTask, Task, TaskId, TaskStatus};
use crate::proto::ITaskRepo;
use crate::utils::now;

#[derive(Default)]
pub struct InMemRepo {
    tasks: BTreeMap<TaskId, Task>,
    in_progress_index: BTreeSet<TaskId>,
    completed_index: BTreeSet<TaskId>,
    err_index: BTreeSet<TaskId>,
    lock_index: BTreeMap<String, TaskId>,
    last_id: TaskId,
}

impl InMemRepo {
    fn next_id(&mut self) -> TaskId {
        loop {
            let id = self.inc_last_id();
            if self.tasks.get(&id).is_none() {
                return id;
            }
        }
    }

    fn inc_last_id(&mut self) -> TaskId {
        let id = if self.last_id == TaskId::MAX {
            0
        } else {
            self.last_id + 1
        };
        self.last_id = id;
        id
    }
}

impl ITaskRepo for InMemRepo {
    fn init(&mut self) -> Result<(), AtexError> {
        Ok(())
    }

    fn create(&mut self, task: NewRepoTask) -> Result<TaskId, AtexError> {
        if let Some(id) = self.lock_index.get(&task.lock) {
            return Ok(*id);
        }
        let id = self.next_id();
        let prev = self.tasks.insert(
            id,
            Task {
                id,
                periodic_interval: task.periodic_interval,
                executor: task.executor,
                status: TaskStatus::InProgress(0),
                payload: task.payload,
                created: task.created,
                updated: task.updated,
                execute_after: task.execute_after,
                lock: task.lock.clone(),
            },
        );
        self.in_progress_index.insert(id);
        self.lock_index.insert(task.lock, id);
        if prev.is_some() {
            panic!("Insert conflict")
        }
        Ok(id)
    }

    fn get_by_id(&self, id: TaskId) -> Result<Option<Task>, AtexError> {
        Ok(self.tasks.get(&id).cloned())
    }

    fn get_by_executor(&self, executor: &str) -> Result<Option<Task>, AtexError> {
        for (_, task) in self.tasks.iter() {
            if task.executor == executor {
                return Ok(Some(task.clone()));
            }
        }
        Ok(None)
    }

    fn get_ready(&self, exclude: &[TaskId], limit: usize) -> Result<Vec<Task>, AtexError> {
        let mut result = Vec::new();
        let now_ts = now();
        for key in self.in_progress_index.iter() {
            if exclude.contains(key) {
                continue;
            }
            let task = match self.tasks.get(key) {
                Some(val) => val,
                _ => panic!("Index in_progress failed(not found)"),
            };
            if !matches!(task.status, TaskStatus::InProgress(_)) {
                panic!("Index in_progress failed(status missmatch)")
            }
            if task.execute_after > now_ts {
                continue;
            }
            result.push(task.clone());
            if result.len() >= limit {
                break;
            }
        }
        Ok(result)
    }

    fn set_status(&mut self, id: TaskId, task_status: TaskStatus) -> Result<(), AtexError> {
        let (to_put, to_del1, to_del2, save_lock) = match task_status {
            TaskStatus::Error(_) => (
                &mut self.err_index,
                &mut self.in_progress_index,
                &mut self.completed_index,
                false,
            ),
            TaskStatus::InProgress(_) => (
                &mut self.in_progress_index,
                &mut self.err_index,
                &mut self.completed_index,
                true,
            ),
            TaskStatus::Completed => (
                &mut self.completed_index,
                &mut self.in_progress_index,
                &mut self.err_index,
                false,
            ),
        };
        let task = match self.tasks.get_mut(&id) {
            Some(val) => val,
            _ => return Ok(()),
        };
        task.status = task_status;
        task.updated = now();
        to_put.insert(id);
        to_del1.remove(&id);
        to_del2.remove(&id);
        if save_lock {
            self.lock_index.insert(task.lock.clone(), id);
        } else {
            self.lock_index.remove(&task.lock);
        }
        Ok(())
    }

    fn set_progress(&mut self, id: TaskId, progress: u8) -> Result<(), AtexError> {
        self.set_status(id, TaskStatus::InProgress(progress.min(99)))
    }

    fn set_execute_after(&mut self, id: TaskId, ts: u64) -> Result<(), AtexError> {
        if let Some(task) = self.tasks.get_mut(&id) {
            task.execute_after = ts;
            task.status = TaskStatus::InProgress(0);
            task.updated = now();
        }
        Ok(())
    }

    fn delete_completed_tasks(&mut self, max_lifetime: u64) -> Result<(), AtexError> {
        delete_tasks(
            &mut self.tasks,
            &mut self.completed_index,
            |s| matches!(s, TaskStatus::Completed),
            max_lifetime,
            "completed",
        );
        Ok(())
    }

    fn delete_error_tasks(&mut self, max_lifetime: u64) -> Result<(), AtexError> {
        delete_tasks(
            &mut self.tasks,
            &mut self.err_index,
            |s| matches!(s, TaskStatus::Error(_)),
            max_lifetime,
            "err",
        );
        Ok(())
    }
}

#[inline(always)]
fn delete_tasks<F: Fn(&TaskStatus) -> bool>(
    tasks: &mut BTreeMap<TaskId, Task>,
    index: &mut BTreeSet<TaskId>,
    status_check: F,
    max_lifetime: u64,
    index_name: &str,
) {
    let mut id_list = Vec::new();
    let threshold = now() - max_lifetime;
    for id in index.iter() {
        let task = match tasks.get(id) {
            Some(val) => val,
            _ => panic!("Index {} failed(not found)", index_name),
        };
        if !status_check(&task.status) {
            panic!("Index {} failed(status missmatch)", index_name)
        }
        if task.updated < threshold {
            id_list.push(*id);
        }
    }
    for id in id_list {
        tasks.remove(&id);
        index.remove(&id);
    }
}