crb-task 0.0.20

CRB | Composable Runtime Blocks | Task
Documentation
use anyhow::{Error, Result};
use async_trait::async_trait;
use crb_runtime::kit::{Controller, Failures, Interruptor, Runtime, Task};
use futures::stream::Abortable;

pub trait HybrydState: Send + 'static {}

impl<T> HybrydState for T where T: Send + 'static {}

pub struct NextState<T: ?Sized> {
    transition: Box<dyn StatePerformer<T>>,
}

impl<T> NextState<T>
where
    T: HybrydTask,
{
    pub(crate) fn new(performer: impl StatePerformer<T>) -> Self {
        Self {
            transition: Box::new(performer),
        }
    }
}

pub enum Transition<T> {
    Next(T, Result<NextState<T>>),
    Crashed(Error),
    Interrupted,
}

#[async_trait]
pub trait StatePerformer<T>: Send + 'static {
    async fn perform(&mut self, task: T, session: &mut HybrydSession) -> Transition<T>;
    async fn fallback(&mut self, task: T, err: Error) -> (T, NextState<T>);
}

pub trait HybrydTask: Sized + Send + 'static {
    fn initial_state(&mut self) -> NextState<Self>;
}

pub struct HybrydSession {
    pub controller: Controller,
}

pub struct DoHybrid<T> {
    pub task: Option<T>,
    pub session: HybrydSession,
    pub failures: Failures,
}

impl<T: HybrydTask> DoHybrid<T> {
    pub fn new(task: T) -> Self {
        let session = HybrydSession {
            controller: Controller::default(),
        };
        Self {
            task: Some(task),
            session,
            failures: Failures::default(),
        }
    }
}

impl<T: HybrydTask> Task<T> for DoHybrid<T> {}

impl<T: HybrydTask> DoHybrid<T> {
    async fn perform_routine(&mut self) -> Result<(), Error> {
        let reg = self.session.controller.take_registration()?;
        let fut = self.perform_task();
        Abortable::new(fut, reg).await??;
        Ok(())
    }

    async fn perform_task(&mut self) -> Result<(), Error> {
        if let Some(mut task) = self.task.take() {
            let session = &mut self.session;
            let initial_state = task.initial_state();
            let mut pair = (task, initial_state);
            loop {
                let (task, mut next_state) = pair;
                let res = next_state.transition.perform(task, session).await;
                match res {
                    Transition::Next(task, Ok(next_state)) => {
                        pair = (task, next_state);
                    }
                    Transition::Next(task, Err(err)) => {
                        let (task, next_state) = next_state.transition.fallback(task, err).await;
                        pair = (task, next_state);
                    }
                    Transition::Crashed(err) => {
                        return Err(err);
                    }
                    Transition::Interrupted => {
                        break;
                    }
                }
            }
        }
        Ok(())
    }
}

#[async_trait]
impl<T> Runtime for DoHybrid<T>
where
    T: HybrydTask,
{
    fn get_interruptor(&mut self) -> Interruptor {
        self.session.controller.interruptor.clone()
    }

    async fn routine(&mut self) {
        let result = self.perform_routine().await;
        self.failures.put(result);
    }
}