json-job-dispatch 3.1.0

Dispatch jobs described by JSON files and sort them according to their status.
Documentation
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use std::error::Error;

use async_trait::async_trait;
use serde_json::Value;

use crate::director::Director;

/// Results from an event.
#[derive(Debug)]
pub enum JobResult {
    /// The event was accepted and acted upon.
    Accept,
    /// The event was deferred until a later time for the given reason.
    Defer(String),
    /// The event was rejected for the given reason.
    Reject(String),
    /// The event failed with the given error.
    Fail(Box<dyn Error + Send + 'static>),
    /// The director should be restarted.
    Restart,
    /// The event was the last one which should be processed.
    Done,
}

impl JobResult {
    /// Create an accept result.
    pub fn accept() -> Self {
        Self::Accept
    }

    /// Create a deferral result.
    pub fn defer<M>(msg: M) -> Self
    where
        M: Into<String>,
    {
        Self::Defer(msg.into())
    }

    /// Create a rejecting result.
    pub fn reject<M>(msg: M) -> Self
    where
        M: Into<String>,
    {
        Self::Reject(msg.into())
    }

    /// Create a failure.
    pub fn fail<E>(err: E) -> Self
    where
        E: Into<Box<dyn Error + Send + Sync + 'static>>,
    {
        Self::Fail(err.into())
    }

    /// Create a restart result.
    pub fn restart() -> Self {
        Self::Restart
    }

    /// Create a completion result.
    pub fn done() -> Self {
        Self::Done
    }

    /// Combine two handler results into one.
    pub fn combine(self, other: Self) -> Self {
        match (self, other) {
            // Acceptance defers to the other.
            (Self::Accept, next) | (next, Self::Accept) => next,
            // Completion overrides any other status.
            (Self::Done, _) | (_, Self::Done) => Self::Done,
            // Once completion is handled, restart takes precedence.
            (Self::Restart, _) | (_, Self::Restart) => Self::Restart,
            // Deferring is next.
            (Self::Defer(left), Self::Defer(right)) => Self::Defer(format!("{}\n{}", left, right)),
            (defer @ Self::Defer(_), _) | (_, defer @ Self::Defer(_)) => defer,
            // Failures are handled next.
            (fail @ Self::Fail(_), _) | (_, fail @ Self::Fail(_)) => fail,
            // All we have left are rejections; combine their messages.
            (Self::Reject(left), Self::Reject(right)) => {
                Self::Reject(format!("{}\n{}", left, right))
            },
        }
    }
}

/// An error which can occur when handling a job.
pub type JobError = Box<dyn Error + Send + Sync>;

/// Interface for handling events.
pub trait HandlerCore {
    /// Adds the handler to a director.
    fn add_to_director<'a>(&'a self, director: &mut Director<'a>) -> Result<(), JobError>;
}

/// Interface for handling events asynchronously.
#[async_trait]
pub trait Handler: HandlerCore {
    /// The JSON object is passed in and acted upon.
    async fn handle(
        &self,
        kind: &str,
        object: &Value,
        retry_count: usize,
    ) -> Result<JobResult, JobError>;
}

#[cfg(test)]
mod tests {
    use crate::JobResult;

    #[test]
    fn test_job_result_ctors() {
        assert!(matches!(JobResult::accept(), JobResult::Accept));
        if let JobResult::Defer(d) = JobResult::defer("hi") {
            assert_eq!(d, "hi");
        } else {
            panic!("`JobResult::defer` doesn't return a `JobResult::Defer` variant");
        }
        if let JobResult::Reject(d) = JobResult::reject("hi") {
            assert_eq!(d, "hi");
        } else {
            panic!("`JobResult::reject` doesn't return a `JobResult::Reject` variant");
        }
        if let JobResult::Fail(d) = JobResult::fail("hi") {
            assert_eq!(format!("{}", d), "hi");
        } else {
            panic!("`JobResult::fail` doesn't return a `JobResult::Fail` variant");
        }
        assert!(matches!(JobResult::restart(), JobResult::Restart));
        assert!(matches!(JobResult::done(), JobResult::Done));
    }

    #[test]
    fn test_job_result_combine() {
        let accept = || JobResult::accept();
        let defer = |d| JobResult::defer(d);
        let reject = |d| JobResult::reject(d);
        let fail = |d| JobResult::fail(d);
        let restart = || JobResult::restart();
        let done = || JobResult::done();

        assert!(matches!(accept().combine(accept()), JobResult::Accept));
        if let JobResult::Defer(d) = JobResult::accept().combine(JobResult::defer("defer2")) {
            assert_eq!(d, "defer2");
        } else {
            panic!("`JobResult::Accept` combined with `::Defer` should return a `::Defer`");
        }
        if let JobResult::Reject(d) = accept().combine(reject("reject2")) {
            assert_eq!(d, "reject2");
        } else {
            panic!("`JobResult::Accept` combined with `::Reject` should return a `::Reject`");
        }
        if let JobResult::Fail(d) = accept().combine(fail("fail2")) {
            assert_eq!(format!("{}", d), "fail2");
        } else {
            panic!("`JobResult::Accept` combined with `::Fail` should return a `::Fail`");
        }
        assert!(matches!(accept().combine(restart()), JobResult::Restart));
        assert!(matches!(accept().combine(done()), JobResult::Done));

        if let JobResult::Defer(d) = defer("defer1").combine(accept()) {
            assert_eq!(d, "defer1");
        } else {
            panic!("`JobResult::Defer` combined with `::Accept` should return a `::Defer`");
        }
        if let JobResult::Defer(d) = defer("defer1").combine(defer("defer2")) {
            assert_eq!(d, "defer1\ndefer2");
        } else {
            panic!("`JobResult::Defer` combined with `::Defer` should return a `::Defer`");
        }
        if let JobResult::Defer(d) = defer("defer1").combine(reject("reject2")) {
            assert_eq!(d, "defer1");
        } else {
            panic!("`JobResult::Defer` combined with `::Reject` should return a `::Defer`");
        }
        if let JobResult::Defer(d) = defer("defer1").combine(fail("fail2")) {
            assert_eq!(d, "defer1");
        } else {
            panic!("`JobResult::Defer` combined with `::Fail` should return a `::Defer`");
        }
        assert!(matches!(
            defer("defer1").combine(restart()),
            JobResult::Restart,
        ));
        assert!(matches!(defer("defer1").combine(done()), JobResult::Done));

        if let JobResult::Reject(d) = reject("reject1").combine(accept()) {
            assert_eq!(d, "reject1");
        } else {
            panic!("`JobResult::Reject` combined with `::Accept` should return a `::Reject`");
        }
        if let JobResult::Defer(d) = reject("reject1").combine(defer("defer2")) {
            assert_eq!(d, "defer2");
        } else {
            panic!("`JobResult::Reject` combined with `::Defer` should return a `::Defer`");
        }
        if let JobResult::Reject(d) = reject("reject1").combine(reject("reject2")) {
            assert_eq!(d, "reject1\nreject2");
        } else {
            panic!("`JobResult::Reject` combined with `::Reject` should return a `::Reject`");
        }
        if let JobResult::Fail(d) = reject("reject1").combine(fail("fail2")) {
            assert_eq!(format!("{}", d), "fail2");
        } else {
            panic!("`JobResult::Reject` combined with `::Fail` should return a `::Fail`");
        }
        assert!(matches!(
            reject("reject1").combine(restart()),
            JobResult::Restart,
        ));
        assert!(matches!(reject("reject1").combine(done()), JobResult::Done));

        if let JobResult::Fail(d) = fail("fail1").combine(accept()) {
            assert_eq!(format!("{}", d), "fail1");
        } else {
            panic!("`JobResult::Fail` combined with `::Accept` should return a `::Fail`");
        }
        if let JobResult::Defer(d) = fail("fail1").combine(defer("defer2")) {
            assert_eq!(format!("{}", d), "defer2");
        } else {
            panic!("`JobResult::Fail` combined with `::Defer` should return a `::Defer`");
        }
        if let JobResult::Fail(d) = fail("fail1").combine(reject("reject2")) {
            assert_eq!(format!("{}", d), "fail1");
        } else {
            panic!("`JobResult::Fail` combined with `::Reject` should return a `::Fail`");
        }
        if let JobResult::Fail(d) = fail("fail1").combine(fail("fail2")) {
            assert_eq!(format!("{}", d), "fail1");
        } else {
            panic!("`JobResult::Fail` combined with `::Fail` should return a `::Fail`");
        }
        assert!(matches!(
            fail("fail1").combine(restart()),
            JobResult::Restart,
        ));
        assert!(matches!(fail("fail1").combine(done()), JobResult::Done));

        assert!(matches!(restart().combine(accept()), JobResult::Restart));
        assert!(matches!(
            restart().combine(defer("defer2")),
            JobResult::Restart,
        ));
        assert!(matches!(
            restart().combine(reject("reject2")),
            JobResult::Restart,
        ));
        assert!(matches!(
            restart().combine(fail("fail2")),
            JobResult::Restart,
        ));
        assert!(matches!(restart().combine(restart()), JobResult::Restart));
        assert!(matches!(restart().combine(done()), JobResult::Done));

        assert!(matches!(done().combine(accept()), JobResult::Done));
        assert!(matches!(done().combine(defer("defer2")), JobResult::Done));
        assert!(matches!(done().combine(reject("reject2")), JobResult::Done));
        assert!(matches!(done().combine(fail("fail2")), JobResult::Done));
        assert!(matches!(done().combine(restart()), JobResult::Done));
        assert!(matches!(done().combine(done()), JobResult::Done));
    }
}