use easy_sqlite::errors::{DbError, DbResult};
use easy_sqlite::{IConnection, IDbRepo, IExecutor, INewDbRepo, ITable, TableManager};
use rusqlite::ToSql;
use crate::entities::error::AtexError;
use crate::entities::task::{NewRepoTask, Task, TaskId, TaskStatus};
use crate::impls::sqlite::tbl::DbTask;
use crate::proto::ITaskRepo;
use crate::utils::now;
pub struct DbTaskRepo<Ex: IExecutor + IConnection> {
pub ex: Ex,
}
unsafe impl<Ex: IExecutor + IConnection> Send for DbTaskRepo<Ex> {}
unsafe impl<Ex: IExecutor + IConnection> Sync for DbTaskRepo<Ex> {}
fn deser_task(row: &rusqlite::Row) -> DbResult<Task> {
let err: String = row.get("error")?;
let status: i8 = row.get("status")?;
let status = match () {
_ if status == DbTask::STATUS_ERR => TaskStatus::Error(err),
_ if status == DbTask::STATUS_COMPLETED => TaskStatus::Completed,
_ if status == DbTask::STATUS_PROGRESS => TaskStatus::InProgress(row.get("progress")?),
_ => panic!("invalid status: {}", status),
};
Ok(Task {
id: row.get("id")?,
periodic_interval: row.get("periodic_interval")?,
executor: row.get("executor")?,
status,
payload: row.get("payload")?,
created: row.get("created")?,
updated: row.get("updated")?,
execute_after: row.get("execute_after")?,
lock: row.get("lock")?,
})
}
impl<Ex: IExecutor + IConnection> ITaskRepo for DbTaskRepo<Ex> {
fn init(&mut self) -> Result<(), AtexError> {
let man = TableManager::<&Ex, DbTask>::create(&self.ex);
man.init()?;
man.set_indexes()?;
Ok(())
}
fn create(&mut self, task: NewRepoTask) -> Result<TaskId, AtexError> {
let query = format!(
r#"
SELECT id FROM {t}
WHERE lock = ? AND status IN (?, ?)
LIMIT 1"#,
t = DbTask::NAME
);
let got = self.ex.get_many(
&query,
&[&task.lock, &DbTask::STATUS_COMPLETED, &DbTask::STATUS_ERR],
|r| Ok(r.get(0)?),
)?;
if !got.is_empty() {
return Ok(got[0]);
}
let query = format!(
r#"
INSERT INTO {t}
(periodic_interval, executor, error, status,
payload, created, updated, execute_after,
lock, progress)
VALUES (?, ?, '', ?, ?, ?, ?, ?, ?, 0)
"#,
t = DbTask::NAME,
);
let id = self.ex.execute_return_id(
&query,
&[
&task.periodic_interval,
&task.executor,
&DbTask::STATUS_PROGRESS,
&task.payload,
&task.created,
&task.updated,
&task.execute_after,
&task.lock,
],
)?;
Ok(id as u64)
}
fn get_by_id(&self, id: TaskId) -> Result<Option<Task>, AtexError> {
let query = format!(
r#"
SELECT
id, periodic_interval, executor,
error, status, payload, created,
updated, execute_after, lock, progress
FROM {t}
WHERE id = ?
"#,
t = DbTask::NAME,
);
let res = self.ex.get_one(&query, &[&id], deser_task);
match res {
Ok(val) => Ok(Some(val)),
Err(DbError::NotFound(_)) => Ok(None),
Err(err) => Err(err.into()),
}
}
fn get_by_executor(&self, executor: &str) -> Result<Option<Task>, AtexError> {
let query = format!(
r#"
SELECT
id, periodic_interval, executor,
error, status, payload, created,
updated, execute_after, lock, progress
FROM {t}
WHERE executor = ?
"#,
t = DbTask::NAME,
);
let res = self.ex.get_one(&query, &[&executor], deser_task);
match res {
Ok(val) => Ok(Some(val)),
Err(DbError::NotFound(_)) => Ok(None),
Err(err) => Err(err.into()),
}
}
fn get_ready(&self, exclude: &[TaskId], limit: usize) -> Result<Vec<Task>, AtexError> {
let q_exclude = if exclude.is_empty() {
"".to_string()
} else {
format!(
"AND id NOT IN ({tmpl})",
tmpl = vec!["?"; exclude.len()].join(",")
)
};
let query = format!(
r#"
SELECT
id, periodic_interval, executor,
error, status, payload, created,
updated, execute_after, lock, progress
FROM {t}
WHERE status = ? AND execute_after <= ?
{q_exclude}
ORDER BY execute_after
LIMIT ?
"#,
t = DbTask::NAME,
q_exclude = q_exclude
);
let now_ts = now();
let mut params: Vec<&dyn ToSql> = vec![&DbTask::STATUS_PROGRESS, &now_ts];
for id in exclude {
params.push(id);
}
params.push(&limit);
let res = self.ex.get_many(&query, ¶ms, deser_task)?;
Ok(res)
}
fn set_status(&mut self, id: TaskId, task_status: TaskStatus) -> Result<(), AtexError> {
let (status, err, progress) = match task_status {
TaskStatus::Error(val) => (DbTask::STATUS_ERR, val, 0),
TaskStatus::InProgress(val) => (DbTask::STATUS_PROGRESS, "".to_string(), val),
TaskStatus::Completed => (DbTask::STATUS_COMPLETED, "".to_string(), 100),
};
let query = format!(
"UPDATE {t} SET status = ?, error = ?, progress = ?, updated = ? WHERE id = ?",
t = DbTask::NAME
);
self.ex
.execute(&query, &[&status, &err, &progress, &now(), &id])?;
Ok(())
}
fn set_progress(&mut self, id: TaskId, progress: u8) -> Result<(), AtexError> {
let progress = (progress as i8).min(DbTask::MAX_PROGRESS);
let query = format!(
"UPDATE {t} SET progress = ?, updated = ? WHERE id = ?",
t = DbTask::NAME
);
self.ex.execute(&query, &[&progress, &now(), &id])?;
Ok(())
}
fn set_execute_after(&mut self, id: TaskId, ts: u64) -> Result<(), AtexError> {
let query = format!(
"UPDATE {t} SET execute_after = ?, status = ?, updated = ? WHERE id = ?",
t = DbTask::NAME
);
self.ex
.execute(&query, &[&ts, &DbTask::MIN_PROGRESS, &now(), &id])?;
Ok(())
}
fn delete_completed_tasks(&mut self, max_lifetime: u64) -> Result<(), AtexError> {
let query = format!(
"DELETE FROM {t} WHERE status = ? AND updated < ?",
t = DbTask::NAME,
);
let threshold = now() - max_lifetime;
self.ex
.execute(&query, &[&DbTask::STATUS_COMPLETED, &threshold])?;
Ok(())
}
fn delete_error_tasks(&mut self, max_lifetime: u64) -> Result<(), AtexError> {
let query = format!(
"DELETE FROM {t} WHERE status = ? AND updated < ?",
t = DbTask::NAME,
);
let threshold = now() - max_lifetime;
self.ex
.execute(&query, &[&DbTask::STATUS_ERR, &threshold])?;
Ok(())
}
}