use crate::context::Context;
use crate::error::TaskError;
use crate::tasks::r#trait::{Task, TaskOutput};
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::FutureExt;
use std::future::Future;
use std::sync::Arc;
type BodyFn = Box<
dyn Fn(Arc<Context>, u32) -> BoxFuture<'static, Result<TaskOutput, TaskError>> + Send + Sync,
>;
type BreakFn = Box<dyn Fn(&TaskOutput) -> bool + Send + Sync>;
pub struct LoopTask {
id: String,
deps: Vec<String>,
priority: u8,
max_iterations: u32,
body: BodyFn,
break_when: BreakFn,
}
impl LoopTask {
pub fn new<F, Fut>(id: impl Into<String>, max_iterations: u32, body: F) -> Self
where
F: Fn(Arc<Context>, u32) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<TaskOutput, TaskError>> + Send + 'static,
{
LoopTask {
id: id.into(),
deps: Vec::new(),
priority: 0,
max_iterations,
body: Box::new(move |ctx, i| body(ctx, i).boxed()),
break_when: Box::new(|_| false),
}
}
pub fn with_break<F>(mut self, break_when: F) -> Self
where
F: Fn(&TaskOutput) -> bool + Send + Sync + 'static,
{
self.break_when = Box::new(break_when);
self
}
pub fn with_deps<I, S>(mut self, deps: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.deps = deps.into_iter().map(Into::into).collect();
self
}
pub fn with_priority(mut self, priority: u8) -> Self {
self.priority = priority;
self
}
}
#[async_trait]
impl Task for LoopTask {
fn id(&self) -> &str {
&self.id
}
fn dependencies(&self) -> Vec<String> {
self.deps.clone()
}
fn priority(&self) -> u8 {
self.priority
}
async fn execute(&self, ctx: Arc<Context>) -> Result<TaskOutput, TaskError> {
let mut last = serde_json::Value::Null;
let mut iterations = 0u32;
let mut broke_early = false;
while iterations < self.max_iterations {
if ctx.is_cancelled() {
return Err(TaskError::Cancelled);
}
last = (self.body)(ctx.clone(), iterations).await?;
iterations += 1;
if (self.break_when)(&last) {
broke_early = true;
break;
}
}
Ok(serde_json::json!({
"iterations": iterations,
"last": last,
"broke_early": broke_early,
}))
}
}