use std::{future::Future, marker::PhantomData, pin::Pin, time::Duration};
use ::tokio::time::sleep;
use chrono::{DateTime, Utc};
#[cfg_attr(test, mockall::automock)]
pub trait Task<ERR: Send + Sync, EXEC: Send + Sync>: Send + Sync {
fn status(&self, exec: &EXEC) -> impl Future<Output = Result<TaskStatus, ERR>> + Send;
fn delete(&self, exec: &EXEC) -> impl Future<Output = Result<(), ERR>> + Send;
fn start(&self, exec: &EXEC) -> impl Future<Output = Result<TaskStatus, ERR>> + Send;
}
pub trait TaskGroup<ERR: Send + Sync, EXEC: Send + Sync, TASK: Task<ERR, EXEC> + 'static>:
Send + Sync
{
fn next(&self) -> Option<&Self>;
fn tasks(&self)
-> impl Future<Output = Result<impl Iterator<Item = &TASK> + Send, ERR>> + Send;
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct FinishedTaskData {
pub deleted: bool,
pub finished_at: DateTime<Utc>,
pub started_at: DateTime<Utc>,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum TaskStatus {
Failed(FinishedTaskData),
Pending,
Running {
started_at: DateTime<Utc>,
},
Succeeded(FinishedTaskData),
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[cfg_attr(feature = "py", pyo3::pyclass)]
pub enum TaskGroupStatus {
Failed,
Running,
Succeeded,
}
pub struct LoopOrchestrator {
delay: Duration,
}
impl LoopOrchestrator {
pub fn new() -> Self {
Self {
delay: Duration::from_secs(0),
}
}
pub fn with_delay(mut self, delay: Duration) -> Self {
self.delay = delay;
self
}
pub async fn process<
'a,
ERR: Send + Sync,
EXEC: Send + Sync,
GROUP: TaskGroup<ERR, EXEC, TASK>,
TASK: Task<ERR, EXEC> + 'static,
>(
&self,
mut group: &'a GROUP,
exec: &EXEC,
) -> Result<TaskGroupStatus, ERR> {
loop {
let (status, next) = Self::do_process(group, exec).await?;
if let Some(next) = next {
group = next;
sleep(self.delay).await;
} else {
break Ok(status);
}
}
}
#[allow(clippy::type_complexity)]
fn do_process<
'a,
ERR: Send + Sync,
EXEC: Send + Sync,
GROUP: TaskGroup<ERR, EXEC, TASK>,
TASK: Task<ERR, EXEC> + 'static,
>(
group: &'a GROUP,
exec: &'a EXEC,
) -> Pin<Box<dyn Future<Output = Result<(TaskGroupStatus, Option<&'a GROUP>), ERR>> + Send + 'a>>
{
Box::pin(async move {
let mut count = 0;
let mut finished = 0;
let mut succeeded = 0;
let tasks = group.tasks().await?;
for task in tasks {
count += 1;
let status = task.status(exec).await?;
Self::handle_task_status(&status, task, &mut finished, &mut succeeded, exec)
.await?;
}
if count == finished {
if succeeded == count {
if let Some(next) = group.next() {
Self::do_process(next, exec).await
} else {
Ok((TaskGroupStatus::Succeeded, None))
}
} else {
Ok((TaskGroupStatus::Failed, None))
}
} else {
Ok((TaskGroupStatus::Running, Some(group)))
}
})
}
fn handle_task_status<'a, ERR: Send + Sync, EXEC: Send + Sync, TASK: Task<ERR, EXEC>>(
status: &'a TaskStatus,
task: &'a TASK,
finished: &'a mut usize,
succeeded: &'a mut usize,
exec: &'a EXEC,
) -> Pin<Box<dyn Future<Output = Result<(), ERR>> + Send + 'a>> {
Box::pin(async move {
match status {
TaskStatus::Failed(data) => {
if !data.deleted {
task.delete(exec).await?;
}
*finished += 1;
}
TaskStatus::Pending => {
let status = task.start(exec).await?;
Self::handle_task_status(&status, task, finished, succeeded, exec).await?;
}
TaskStatus::Succeeded(data) => {
if !data.deleted {
task.delete(exec).await?;
}
*finished += 1;
*succeeded += 1;
}
_ => {}
}
Ok(())
})
}
}
impl Default for LoopOrchestrator {
fn default() -> Self {
Self::new()
}
}
pub struct VecTaskGroup<ERR: Send + Sync, EXEC: Send + Sync, TASK: Task<ERR, EXEC>> {
next: Option<Box<Self>>,
tasks: Vec<TASK>,
_err: PhantomData<ERR>,
_exec: PhantomData<EXEC>,
}
impl<ERR: Send + Sync, EXEC: Send + Sync, TASK: Task<ERR, EXEC>> VecTaskGroup<ERR, EXEC, TASK> {
pub fn new<TASKS: IntoIterator<Item = TASK>>(tasks: TASKS) -> Self {
Self {
next: None,
tasks: tasks.into_iter().collect(),
_err: PhantomData,
_exec: PhantomData,
}
}
pub fn then(mut self, group: Self) -> Self {
self.next = Some(Box::new(group));
self
}
}
impl<ERR: Send + Sync, EXEC: Send + Sync, TASK: Task<ERR, EXEC> + 'static>
TaskGroup<ERR, EXEC, TASK> for VecTaskGroup<ERR, EXEC, TASK>
{
fn next(&self) -> Option<&Self> {
self.next.as_ref().map(|group| group.as_ref())
}
async fn tasks(&self) -> Result<impl Iterator<Item = &TASK>, ERR> {
Ok(self.tasks.iter())
}
}
#[cfg(feature = "py")]
pub mod py;
#[cfg(feature = "tokio")]
pub mod tokio;
#[cfg(test)]
mod test {
use chrono::Utc;
use super::{
FinishedTaskData, LoopOrchestrator, MockTask, TaskGroupStatus, TaskStatus, VecTaskGroup,
};
#[derive(Debug)]
struct MockErr;
struct MockExecutor;
mod loop_orchestrator {
use super::*;
mod do_process {
use super::*;
#[tokio::test]
async fn start_a() {
let a_status_init = TaskStatus::Pending;
let a_status_final = TaskStatus::Running {
started_at: Utc::now(),
};
let b_status_init = TaskStatus::Running {
started_at: Utc::now(),
};
let c_status_init = TaskStatus::Pending;
let (mut a, b, c) = mock_tasks(a_status_init, b_status_init, c_status_init);
a.expect_start()
.returning(move |_| Box::pin(async move { Ok(a_status_final) }));
let group = VecTaskGroup::new([a, b]).then(VecTaskGroup::new([c]));
let (status, group) = LoopOrchestrator::do_process(&group, &MockExecutor)
.await
.unwrap();
assert_eq!(status, TaskGroupStatus::Running);
let group = group.unwrap();
assert_eq!(group.tasks.len(), 2);
}
#[tokio::test]
async fn start_c() {
let a_status_init = TaskStatus::Pending;
let a_status_final = TaskStatus::Succeeded(FinishedTaskData {
deleted: false,
finished_at: Utc::now(),
started_at: Utc::now(),
});
let b_status_init = TaskStatus::Succeeded(FinishedTaskData {
deleted: true,
finished_at: Utc::now(),
started_at: Utc::now(),
});
let c_status_init = TaskStatus::Pending;
let c_status_final = TaskStatus::Running {
started_at: Utc::now(),
};
let (mut a, b, mut c) = mock_tasks(a_status_init, b_status_init, c_status_init);
a.expect_start()
.returning(move |_| Box::pin(async move { Ok(a_status_final) }));
a.expect_delete().returning(|_| Box::pin(async { Ok(()) }));
c.expect_start()
.returning(move |_| Box::pin(async move { Ok(c_status_final) }));
let group = VecTaskGroup::new([a, b]).then(VecTaskGroup::new([c]));
let (status, group) = LoopOrchestrator::do_process(&group, &MockExecutor)
.await
.unwrap();
assert_eq!(status, TaskGroupStatus::Running);
let group = group.unwrap();
assert_eq!(group.tasks.len(), 1);
}
#[tokio::test]
async fn failed() {
let a_status_init = TaskStatus::Succeeded(FinishedTaskData {
deleted: true,
finished_at: Utc::now(),
started_at: Utc::now(),
});
let b_status_init = TaskStatus::Failed(FinishedTaskData {
deleted: false,
finished_at: Utc::now(),
started_at: Utc::now(),
});
let c_status_init = TaskStatus::Pending;
let (a, mut b, c) = mock_tasks(a_status_init, b_status_init, c_status_init);
b.expect_delete().returning(|_| Box::pin(async { Ok(()) }));
let group = VecTaskGroup::new([a, b]).then(VecTaskGroup::new([c]));
let (status, group) = LoopOrchestrator::do_process(&group, &MockExecutor)
.await
.unwrap();
assert_eq!(status, TaskGroupStatus::Failed);
assert!(group.is_none());
}
#[tokio::test]
async fn succeeded() {
let a_status_init = TaskStatus::Succeeded(FinishedTaskData {
deleted: true,
finished_at: Utc::now(),
started_at: Utc::now(),
});
let b_status_init = TaskStatus::Succeeded(FinishedTaskData {
deleted: true,
finished_at: Utc::now(),
started_at: Utc::now(),
});
let c_status_init = TaskStatus::Succeeded(FinishedTaskData {
deleted: true,
finished_at: Utc::now(),
started_at: Utc::now(),
});
let (a, b, c) = mock_tasks(a_status_init, b_status_init, c_status_init);
let group = VecTaskGroup::new([a, b]).then(VecTaskGroup::new([c]));
let (status, group) = LoopOrchestrator::do_process(&group, &MockExecutor)
.await
.unwrap();
assert_eq!(status, TaskGroupStatus::Succeeded);
assert!(group.is_none());
}
fn mock_tasks(
a_status: TaskStatus,
b_status: TaskStatus,
c_status: TaskStatus,
) -> (
MockTask<MockErr, MockExecutor>,
MockTask<MockErr, MockExecutor>,
MockTask<MockErr, MockExecutor>,
) {
let mut a = MockTask::new();
a.expect_status()
.returning(move |_| Box::pin(async move { Ok(a_status) }));
let mut b = MockTask::new();
b.expect_status()
.returning(move |_| Box::pin(async move { Ok(b_status) }));
let mut c = MockTask::new();
c.expect_status()
.returning(move |_| Box::pin(async move { Ok(c_status) }));
(a, b, c)
}
}
}
}