use crate::error::CanoError;
use crate::resource::Resources;
use cano_macros::task;
use std::borrow::Cow;
use std::fmt;
use std::hash::Hash;
pub mod batch;
pub mod poll;
mod retry;
pub mod router;
pub mod stepped;
pub use batch::{
BatchTask, BatchTaskObject, DefaultBatchItem, DefaultBatchItemOutput, DynBatchTask, run_batch,
};
pub use poll::{
DynPollTask, PollErrorPolicy, PollOutcome, PollTask, PollTaskObject, run_poll_loop,
};
pub use retry::{RetryMode, TaskConfig, run_with_retries};
pub use router::{DynRouterTask, RouterTask, RouterTaskObject};
pub use stepped::{
DefaultStepCursor, DynSteppedTask, StepOutcome, SteppedTask, SteppedTaskObject, run_stepped,
};
pub use cano_macros::batch_task as batch;
pub use cano_macros::poll_task as poll;
pub use cano_macros::router_task as router;
pub use cano_macros::stepped_task as stepped;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TaskResult<TState> {
Single(TState),
Split(Vec<TState>),
}
#[task]
pub trait Task<TState, TResourceKey = Cow<'static, str>>: Send + Sync
where
TState: Clone + fmt::Debug + Send + Sync + 'static,
TResourceKey: Hash + Eq + Send + Sync + 'static,
{
fn config(&self) -> TaskConfig {
TaskConfig::default()
}
fn name(&self) -> Cow<'static, str> {
Cow::Borrowed(std::any::type_name::<Self>())
}
async fn run(&self, res: &Resources<TResourceKey>) -> Result<TaskResult<TState>, CanoError> {
let _ = res;
self.run_bare().await
}
async fn run_bare(&self) -> Result<TaskResult<TState>, CanoError> {
Err(CanoError::configuration(format!(
"Task<{}>: neither `run` nor `run_bare` was implemented; override one of them",
std::any::type_name::<Self>(),
)))
}
}
pub type DynTask<TState, TResourceKey = Cow<'static, str>> =
dyn Task<TState, TResourceKey> + Send + Sync;
pub type TaskObject<TState, TResourceKey = Cow<'static, str>> =
std::sync::Arc<DynTask<TState, TResourceKey>>;
#[cfg(test)]
mod tests {
use super::*;
use crate::resource::Resources;
use cano_macros::task;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use tokio;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[allow(dead_code)]
enum TestAction {
Continue,
Complete,
Error,
Retry,
}
struct SimpleTask {
execution_count: Arc<AtomicU32>,
}
impl SimpleTask {
fn new() -> Self {
Self {
execution_count: Arc::new(AtomicU32::new(0)),
}
}
fn execution_count(&self) -> u32 {
self.execution_count.load(Ordering::SeqCst)
}
}
#[task]
impl Task<TestAction> for SimpleTask {
async fn run_bare(&self) -> Result<TaskResult<TestAction>, CanoError> {
self.execution_count.fetch_add(1, Ordering::SeqCst);
Ok(TaskResult::Single(TestAction::Complete))
}
}
struct FailingTask {
should_fail: bool,
}
impl FailingTask {
fn new(should_fail: bool) -> Self {
Self { should_fail }
}
}
#[task]
impl Task<TestAction> for FailingTask {
async fn run_bare(&self) -> Result<TaskResult<TestAction>, CanoError> {
if self.should_fail {
Err(CanoError::task_execution("Task intentionally failed"))
} else {
Ok(TaskResult::Single(TestAction::Complete))
}
}
}
struct SplitTask;
#[task]
impl Task<TestAction> for SplitTask {
async fn run_bare(&self) -> Result<TaskResult<TestAction>, CanoError> {
Ok(TaskResult::Split(vec![
TestAction::Continue,
TestAction::Complete,
]))
}
}
#[tokio::test]
async fn test_simple_task_execution() {
let task = SimpleTask::new();
let result = task.run_bare().await.unwrap();
assert_eq!(result, TaskResult::Single(TestAction::Complete));
assert_eq!(task.execution_count(), 1);
}
#[tokio::test]
async fn test_failing_task() {
let success_task = FailingTask::new(false);
let result = success_task.run_bare().await.unwrap();
assert_eq!(result, TaskResult::Single(TestAction::Complete));
let fail_task = FailingTask::new(true);
let result = fail_task.run_bare().await;
assert!(result.is_err());
let error = result.unwrap_err();
assert!(error.to_string().contains("Task intentionally failed"));
}
#[tokio::test]
async fn test_split_task() {
let task = SplitTask;
let result = task.run_bare().await.unwrap();
assert_eq!(
result,
TaskResult::Split(vec![TestAction::Continue, TestAction::Complete])
);
}
#[tokio::test]
async fn test_unimplemented_run_returns_configuration_error() {
struct ForgotToImplement;
#[task]
impl Task<TestAction> for ForgotToImplement {}
let task = ForgotToImplement;
let res = Resources::new();
let err = task.run(&res).await.unwrap_err();
assert_eq!(err.category(), "configuration");
assert!(
err.message().contains("ForgotToImplement"),
"error should name the offending type, got: {}",
err.message()
);
}
#[tokio::test]
async fn test_concurrent_task_execution() {
use tokio::task;
let task = Arc::new(SimpleTask::new());
let mut handles = vec![];
for _ in 0..10 {
let task_clone = Arc::clone(&task);
let handle = task::spawn(async move { task_clone.run_bare().await });
handles.push(handle);
}
let mut success_count = 0;
for handle in handles {
let result = handle.await.unwrap();
if let Ok(TaskResult::Single(TestAction::Complete)) = result {
success_count += 1;
}
}
assert_eq!(success_count, 10);
assert_eq!(task.execution_count(), 10);
}
#[tokio::test]
async fn test_multiple_task_executions() {
let task = SimpleTask::new();
for i in 1..=5 {
let result = task.run_bare().await.unwrap();
assert_eq!(result, TaskResult::Single(TestAction::Complete));
assert_eq!(task.execution_count(), i);
}
}
struct BareTask;
#[task]
impl Task<TestAction> for BareTask {
async fn run_bare(&self) -> Result<TaskResult<TestAction>, CanoError> {
Ok(TaskResult::Single(TestAction::Complete))
}
}
struct ExplicitRunTask {
bare_called: Arc<AtomicU32>,
}
#[task]
impl Task<TestAction> for ExplicitRunTask {
async fn run(&self, _res: &Resources) -> Result<TaskResult<TestAction>, CanoError> {
Ok(TaskResult::Single(TestAction::Continue))
}
async fn run_bare(&self) -> Result<TaskResult<TestAction>, CanoError> {
self.bare_called.fetch_add(1, Ordering::SeqCst);
Ok(TaskResult::Single(TestAction::Error))
}
}
#[tokio::test]
async fn test_run_bare_called_when_run_not_overridden() {
let task = BareTask;
let res = Resources::new();
let result = task.run(&res).await.unwrap();
assert_eq!(result, TaskResult::Single(TestAction::Complete));
}
#[tokio::test]
async fn test_run_overrides_bypass_bare() {
let bare_called = Arc::new(AtomicU32::new(0));
let task = ExplicitRunTask {
bare_called: Arc::clone(&bare_called),
};
let res = Resources::new();
let result = task.run(&res).await.unwrap();
assert_eq!(result, TaskResult::Single(TestAction::Continue));
assert_eq!(bare_called.load(Ordering::SeqCst), 0);
}
}