crius 0.3.1

Crius is a simple hystrix-like circuit breaker
Documentation
mod circuit_breaker;
mod circuit_breaker_stats;
mod window;
pub mod error;

use self::error::reject_error::RejectError;
use self::circuit_breaker::CircuitBreaker;
use std::error::Error;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::Receiver;
use std::sync::mpsc;
use threadpool::ThreadPool;
use std::marker::PhantomData;

pub type CommandError = Error + Send + Sync + 'static;

#[derive(Copy, Clone, Debug)]
pub struct Config {
    pub error_threshold: Option<i32>,
    pub error_threshold_percentage: Option<i32>,
    pub buckets_in_window: Option<i32>,
    pub bucket_size_in_ms:  Option<u64>,
    pub circuit_open_ms: Option<u64>,
    pub threadpool_size: Option<i32>,
    pub circuit_breaker_enabled: Option<bool>
}

impl Config {
    pub fn new() -> Config {
        return Config {
            error_threshold: None,
            error_threshold_percentage: None,
            buckets_in_window: None,
            bucket_size_in_ms: None,
            circuit_open_ms: None,
            threadpool_size: None,
            circuit_breaker_enabled: None
        }
    }

    pub fn error_threshold(&mut self, error_threshold: i32) -> &mut Self {
        self.error_threshold = Some(error_threshold);
        return self;
    }

    pub fn error_threshold_percentage(&mut self, error_threshold_percentage: i32) -> &mut Self {
        self.error_threshold_percentage = Some(error_threshold_percentage);
        return self;
    }

    pub fn buckets_in_window(&mut self, buckets_in_window: i32) -> &mut Self {
        self.buckets_in_window = Some(buckets_in_window);
        return self;
    }

    pub fn bucket_size_in_ms(&mut self, bucket_size_in_ms: u64) -> &mut Self {
        self.bucket_size_in_ms = Some(bucket_size_in_ms);
        return self;
    }

    pub fn circuit_open_ms(&mut self, circuit_open_ms: u64) -> &mut Self {
        self.circuit_open_ms = Some(circuit_open_ms);
        return self;
    }

    pub fn circuit_breaker_enabled(&mut self, circuit_breaker_enabled: bool) -> &mut Self {
        self.circuit_breaker_enabled = Some(circuit_breaker_enabled);
        return self;
    }
}

pub struct Command<P, T, CMD> where T: Send, CMD: Fn(P) -> Result<T, Box<CommandError>> + Sync + Send {
    pub config: Option<Config>,
    pub cmd: CMD,
    phantom_data: PhantomData<P>
}

pub struct CommandWithFallback<P, T, CMD, FB> where T: Send, CMD: Fn(P) -> Result<T, Box<CommandError>> + Sync + Send, FB: Fn(Box<CommandError>) -> T + Sync + Send {
    pub fb: FB,
    pub config: Option<Config>,
    pub cmd: CMD,
    phantom_data: PhantomData<P>
}

impl <P, T, CMD> Command<P, T, CMD> where T: Send + 'static, CMD: Fn(P) -> Result<T, Box<CommandError>> + Sync + Send {
    pub fn define(cmd: CMD) -> Command<P, T, CMD> {
        return Command {
            cmd: cmd,
            config: None,
            phantom_data: PhantomData
        }
    }

    pub fn define_with_fallback<FB>(cmd: CMD, fallback: FB) -> CommandWithFallback<P, T, CMD, FB> where FB: Fn(Box<CommandError>) -> T + Sync + Send {
        return CommandWithFallback {
            cmd: cmd,
            fb: fallback,
            config: None,
            phantom_data: PhantomData
        }
    }
}

impl <P, T, CMD> Command<P, T, CMD> where P: Send + 'static, T: Send + 'static, CMD: Fn(P) -> Result<T, Box<CommandError>> + Sync + Send {

    pub fn config(mut self, config: Config) -> Self {
        self.config = Some(config);
        return self
    }

    pub fn create(self) -> RunnableCommand<P, T, CMD, fn(Box<CommandError>) -> T> {
        return RunnableCommand::new(self.cmd, None, self.config)
    }
}

impl <P, T, CMD, FB> CommandWithFallback<P, T, CMD, FB> where P: Send + 'static, T: Send + 'static, CMD: Fn(P) -> Result<T, Box<CommandError>> + Sync + Send, FB: Fn(Box<CommandError>) -> T + Sync + Send + 'static {
    pub fn config(mut self, config: Config) -> Self {
        self.config = Some(config);
        return self
    }

    pub fn create(self) -> RunnableCommand<P, T, CMD, FB> {
        return RunnableCommand::new(self.cmd, Some(self.fb), self.config)
    }
}

const DEFAULT_ERROR_THRESHOLD: i32 = 10;
const DEFAULT_ERROR_THRESHOLD_PERCENTAGE: i32 = 50;
const DEFAULT_BUCKETS_IN_WINDOW: i32 = 10;
const DEFAULT_BUCKET_SIZE_IN_MS: u64 = 1000;
const DEFAULT_CIRCUIT_OPEN_MS: u64 = 5000;
const DEFAULT_THREADPOOL_SIZE: i32 = 10;
const DEFAULT_CIRCUIT_BREAKER_ENABLED: bool = true;

pub struct RunnableCommand<P, T, CMD, FB> where T: Send + 'static, CMD: Fn(P) -> Result<T, Box<CommandError>> + Sync + Send + 'static, FB: Fn(Box<CommandError>) -> T + Sync + Send + 'static {
    command_params: Arc<Mutex<CommandParams<P, T, CMD, FB>>>,
    pool: ThreadPool
}

impl <P, T, CMD, FB> RunnableCommand<P, T, CMD, FB> where P: Send + 'static, T: Send + 'static, CMD: Fn(P) -> Result<T, Box<CommandError>> + Sync + Send + 'static, FB: Fn(Box<CommandError>) -> T + Sync + Send + 'static {

    fn new(cmd: CMD,
           fb: Option<FB>,
           config: Option<Config>) -> RunnableCommand<P, T, CMD, FB> {
        let final_config = Config {
            error_threshold: config.and_then(|c| c.error_threshold).or(Some(DEFAULT_ERROR_THRESHOLD)),
            error_threshold_percentage: config.and_then(|c| c.error_threshold_percentage).or(Some(DEFAULT_ERROR_THRESHOLD_PERCENTAGE)),
            buckets_in_window: config.and_then(|c| c.buckets_in_window).or(Some(DEFAULT_BUCKETS_IN_WINDOW)),
            bucket_size_in_ms: config.and_then(|c| c.bucket_size_in_ms).or(Some(DEFAULT_BUCKET_SIZE_IN_MS)),
            circuit_open_ms: config.and_then(|c| c.circuit_open_ms).or(Some(DEFAULT_CIRCUIT_OPEN_MS)),
            threadpool_size: config.and_then(|c| c.threadpool_size).or(Some(DEFAULT_THREADPOOL_SIZE)),
            circuit_breaker_enabled: config.and_then(|c| c.circuit_breaker_enabled).or(Some(DEFAULT_CIRCUIT_BREAKER_ENABLED))
        };

        return RunnableCommand {
            command_params: Arc::new(Mutex::new(CommandParams {
                config: final_config,
                cmd: cmd,
                fb: fb,
                circuit_breaker:CircuitBreaker::new(final_config),
                phantom_data: PhantomData
            })),
            pool: ThreadPool::new(1)
        }
    }

    pub fn run(&mut self, param: P) -> Receiver<Result<T, Box<CommandError>>> {
        let command = self.command_params.clone();
        let (tx, rx) = mpsc::channel();

        self.pool.execute(move || {
            let is_allowed = command.lock().unwrap().circuit_breaker.check_command_allowed();
            if !command.lock().unwrap().config.circuit_breaker_enabled.unwrap_or(true) {
                let res = (command.lock().unwrap().cmd)(param);
                tx.send(res).unwrap()
            } else if is_allowed {
                let res = (command.lock().unwrap().cmd)(param);
                command.lock().unwrap().circuit_breaker.register_result(&res);

                if command.lock().unwrap().fb.is_some() && res.is_err() {
                    let final_res = Ok(res.unwrap_or_else(command.lock().unwrap().fb.as_ref().unwrap()));
                    tx.send(final_res).unwrap()
                } else {
                    tx.send(res).unwrap()
                }
            } else if command.lock().unwrap().fb.is_some() {
                let res = (command.lock().unwrap().fb.as_ref().unwrap())(Box::new(RejectError {}));
                tx.send(Ok(res)).unwrap()
            } else {
                tx.send(Err(Box::new(RejectError {}) as Box<CommandError>)).unwrap()
            }

        });

        return rx
    }
}

struct CommandParams<P, T, CMD, FB> where T: Send + 'static, CMD: Fn(P) -> Result<T, Box<CommandError>> + Sync + Send + 'static, FB: Fn(Box<CommandError>) -> T + Sync + Send + 'static {
    config: Config,
    cmd: CMD,
    fb: Option<FB>,
    circuit_breaker: CircuitBreaker,
    phantom_data: PhantomData<P>
}