atex 0.3.0

Lib for async local task evaluation(sqlite or in-memory)
Documentation
use easy_sqlite::errors::{DbError, DbResult};
use easy_sqlite::{IConnection, IDbRepo, IExecutor, INewDbRepo, ITable, TableManager};
use rusqlite::ToSql;

use crate::entities::error::AtexError;
use crate::entities::task::{NewRepoTask, Task, TaskId, TaskStatus};
use crate::impls::sqlite::tbl::DbTask;
use crate::proto::ITaskRepo;
use crate::utils::now;

pub struct DbTaskRepo<Ex: IExecutor + IConnection> {
    pub ex: Ex,
}

unsafe impl<Ex: IExecutor + IConnection> Send for DbTaskRepo<Ex> {}
unsafe impl<Ex: IExecutor + IConnection> Sync for DbTaskRepo<Ex> {}

fn deser_task(row: &rusqlite::Row) -> DbResult<Task> {
    let err: String = row.get("error")?;
    let status: i8 = row.get("status")?;
    let status = match () {
        _ if status == DbTask::STATUS_ERR => TaskStatus::Error(err),
        _ if status == DbTask::STATUS_COMPLETED => TaskStatus::Completed,
        _ if status == DbTask::STATUS_PROGRESS => TaskStatus::InProgress(row.get("progress")?),
        _ => panic!("invalid status: {}", status),
    };
    Ok(Task {
        id: row.get("id")?,
        periodic_interval: row.get("periodic_interval")?,
        executor: row.get("executor")?,
        status,
        payload: row.get("payload")?,
        created: row.get("created")?,
        updated: row.get("updated")?,
        execute_after: row.get("execute_after")?,
        lock: row.get("lock")?,
    })
}

impl<Ex: IExecutor + IConnection> ITaskRepo for DbTaskRepo<Ex> {
    fn init(&mut self) -> Result<(), AtexError> {
        let man = TableManager::<&Ex, DbTask>::create(&self.ex);
        man.init()?;
        man.set_indexes()?;
        Ok(())
    }

    fn create(&mut self, task: NewRepoTask) -> Result<TaskId, AtexError> {
        let query = format!(
            r#"
            SELECT id FROM {t}
            WHERE lock = ? AND status IN (?, ?)
            LIMIT 1"#,
            t = DbTask::NAME
        );
        let got = self.ex.get_many(
            &query,
            &[&task.lock, &DbTask::STATUS_COMPLETED, &DbTask::STATUS_ERR],
            |r| Ok(r.get(0)?),
        )?;
        if !got.is_empty() {
            return Ok(got[0]);
        }
        let query = format!(
            r#"
            INSERT INTO {t}
            (periodic_interval, executor, error, status,
             payload, created, updated, execute_after,
             lock, progress)
            VALUES (?, ?, '', ?, ?, ?, ?, ?, ?, 0)
            "#,
            t = DbTask::NAME,
        );
        let id = self.ex.execute_return_id(
            &query,
            &[
                &task.periodic_interval,
                &task.executor,
                &DbTask::STATUS_PROGRESS,
                &task.payload,
                &task.created,
                &task.updated,
                &task.execute_after,
                &task.lock,
            ],
        )?;
        Ok(id as u64)
    }

    fn get_by_id(&self, id: TaskId) -> Result<Option<Task>, AtexError> {
        let query = format!(
            r#"
            SELECT
                id, periodic_interval, executor,
                error, status, payload, created,
                updated, execute_after, lock, progress
            FROM {t}
            WHERE id = ?
            "#,
            t = DbTask::NAME,
        );
        let res = self.ex.get_one(&query, &[&id], deser_task);
        match res {
            Ok(val) => Ok(Some(val)),
            Err(DbError::NotFound(_)) => Ok(None),
            Err(err) => Err(err.into()),
        }
    }

    fn get_by_executor(&self, executor: &str) -> Result<Option<Task>, AtexError> {
        let query = format!(
            r#"
            SELECT
                id, periodic_interval, executor,
                error, status, payload, created,
                updated, execute_after, lock, progress
            FROM {t}
            WHERE executor = ?
            "#,
            t = DbTask::NAME,
        );
        let res = self.ex.get_one(&query, &[&executor], deser_task);
        match res {
            Ok(val) => Ok(Some(val)),
            Err(DbError::NotFound(_)) => Ok(None),
            Err(err) => Err(err.into()),
        }
    }

    fn get_ready(&self, exclude: &[TaskId], limit: usize) -> Result<Vec<Task>, AtexError> {
        let q_exclude = if exclude.is_empty() {
            "".to_string()
        } else {
            format!(
                "AND id NOT IN ({tmpl})",
                tmpl = vec!["?"; exclude.len()].join(",")
            )
        };
        let query = format!(
            r#"
            SELECT
                id, periodic_interval, executor,
                error, status, payload, created,
                updated, execute_after, lock, progress
            FROM {t}
            WHERE status = ? AND execute_after <= ?
            {q_exclude}
            ORDER BY execute_after
            LIMIT ?
            "#,
            t = DbTask::NAME,
            q_exclude = q_exclude
        );
        let now_ts = now();
        let mut params: Vec<&dyn ToSql> = vec![&DbTask::STATUS_PROGRESS, &now_ts];
        for id in exclude {
            params.push(id);
        }
        params.push(&limit);
        let res = self.ex.get_many(&query, &params, deser_task)?;
        Ok(res)
    }

    fn set_status(&mut self, id: TaskId, task_status: TaskStatus) -> Result<(), AtexError> {
        let (status, err, progress) = match task_status {
            TaskStatus::Error(val) => (DbTask::STATUS_ERR, val, 0),
            TaskStatus::InProgress(val) => (DbTask::STATUS_PROGRESS, "".to_string(), val),
            TaskStatus::Completed => (DbTask::STATUS_COMPLETED, "".to_string(), 100),
        };
        let query = format!(
            "UPDATE {t} SET status = ?, error = ?, progress = ?, updated = ? WHERE id = ?",
            t = DbTask::NAME
        );
        self.ex
            .execute(&query, &[&status, &err, &progress, &now(), &id])?;
        Ok(())
    }

    fn set_progress(&mut self, id: TaskId, progress: u8) -> Result<(), AtexError> {
        let progress = (progress as i8).min(DbTask::MAX_PROGRESS);
        let query = format!(
            "UPDATE {t} SET progress = ?, updated = ? WHERE id = ?",
            t = DbTask::NAME
        );
        self.ex.execute(&query, &[&progress, &now(), &id])?;
        Ok(())
    }

    fn set_execute_after(&mut self, id: TaskId, ts: u64) -> Result<(), AtexError> {
        let query = format!(
            "UPDATE {t} SET execute_after = ?, status = ?, updated = ? WHERE id = ?",
            t = DbTask::NAME
        );
        self.ex
            .execute(&query, &[&ts, &DbTask::MIN_PROGRESS, &now(), &id])?;
        Ok(())
    }

    fn delete_completed_tasks(&mut self, max_lifetime: u64) -> Result<(), AtexError> {
        let query = format!(
            "DELETE FROM {t} WHERE status = ? AND updated < ?",
            t = DbTask::NAME,
        );
        let threshold = now() - max_lifetime;
        self.ex
            .execute(&query, &[&DbTask::STATUS_COMPLETED, &threshold])?;
        Ok(())
    }

    fn delete_error_tasks(&mut self, max_lifetime: u64) -> Result<(), AtexError> {
        let query = format!(
            "DELETE FROM {t} WHERE status = ? AND updated < ?",
            t = DbTask::NAME,
        );
        let threshold = now() - max_lifetime;
        self.ex
            .execute(&query, &[&DbTask::STATUS_ERR, &threshold])?;
        Ok(())
    }
}