use std::fmt::{self, Debug};
use std::sync::Arc;
use std::time::{Duration, Instant};
use parking_lot::Mutex;
use super::clock;
use super::failure_policy::FailurePolicy;
use super::instrument::Instrument;
const ON_CLOSED: u8 = 0b0000_0001;
const ON_HALF_OPEN: u8 = 0b0000_0010;
const ON_REJECTED: u8 = 0b0000_0100;
const ON_OPEN: u8 = 0b0000_1000;
#[derive(Debug)]
enum State {
Closed,
Open(Instant, Duration),
HalfOpen(Duration),
}
struct Shared<POLICY> {
state: State,
failure_policy: POLICY,
}
struct Inner<POLICY, INSTRUMENT> {
shared: Mutex<Shared<POLICY>>,
instrument: INSTRUMENT,
}
pub struct StateMachine<POLICY, INSTRUMENT> {
inner: Arc<Inner<POLICY, INSTRUMENT>>,
}
impl State {
#[inline]
pub fn as_str(&self) -> &'static str {
match self {
State::Open(_, _) => "open",
State::Closed => "closed",
State::HalfOpen(_) => "half_open",
}
}
}
impl<POLICY, INSTRUMENT> Debug for StateMachine<POLICY, INSTRUMENT> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let shared = self.inner.shared.lock();
f.debug_struct("StateMachine")
.field("state", &(shared.state.as_str()))
.finish()
}
}
impl<POLICY, INSTRUMENT> Clone for StateMachine<POLICY, INSTRUMENT> {
fn clone(&self) -> Self {
StateMachine {
inner: self.inner.clone(),
}
}
}
impl<POLICY> Shared<POLICY>
where
POLICY: FailurePolicy,
{
#[inline]
fn transit_to_closed(&mut self) {
self.state = State::Closed;
self.failure_policy.revived();
}
#[inline]
fn transit_to_half_open(&mut self, delay: Duration) {
self.state = State::HalfOpen(delay);
}
#[inline]
fn transit_to_open(&mut self, delay: Duration) {
let until = clock::now() + delay;
self.state = State::Open(until, delay);
}
}
impl<POLICY, INSTRUMENT> StateMachine<POLICY, INSTRUMENT>
where
POLICY: FailurePolicy,
INSTRUMENT: Instrument,
{
pub fn new(failure_policy: POLICY, instrument: INSTRUMENT) -> Self {
instrument.on_closed();
StateMachine {
inner: Arc::new(Inner {
shared: Mutex::new(Shared {
state: State::Closed,
failure_policy,
}),
instrument,
}),
}
}
pub fn is_call_permitted(&self) -> bool {
let mut instrument: u8 = 0;
let res = {
let mut shared = self.inner.shared.lock();
match shared.state {
State::Closed => true,
State::HalfOpen(_) => true,
State::Open(until, delay) => {
if clock::now() > until {
shared.transit_to_half_open(delay);
instrument |= ON_HALF_OPEN;
true
} else {
instrument |= ON_REJECTED;
false
}
}
}
};
if instrument & ON_HALF_OPEN != 0 {
self.inner.instrument.on_half_open();
}
if instrument & ON_REJECTED != 0 {
self.inner.instrument.on_call_rejected();
}
res
}
pub fn reset(&self) {
let mut shared = self.inner.shared.lock();
match shared.state {
State::HalfOpen(_) => {
shared.transit_to_closed();
self.inner.instrument.on_closed();
}
State::Open(_, _) => {
shared.transit_to_closed();
self.inner.instrument.on_closed();
}
_ => {}
}
}
pub fn on_success(&self) {
let mut instrument: u8 = 0;
{
let mut shared = self.inner.shared.lock();
if let State::HalfOpen(_) = shared.state {
shared.transit_to_closed();
instrument |= ON_CLOSED;
}
shared.failure_policy.record_success()
}
if instrument & ON_CLOSED != 0 {
self.inner.instrument.on_closed();
}
}
pub fn on_error(&self) {
let mut instrument: u8 = 0;
{
let mut shared = self.inner.shared.lock();
match shared.state {
State::Closed => {
if let Some(delay) = shared.failure_policy.mark_dead_on_failure() {
shared.transit_to_open(delay);
instrument |= ON_OPEN;
}
}
State::HalfOpen(delay_in_half_open) => {
let delay = shared
.failure_policy
.mark_dead_on_failure()
.unwrap_or(delay_in_half_open);
shared.transit_to_open(delay);
instrument |= ON_OPEN;
}
_ => {}
}
}
if instrument & ON_OPEN != 0 {
self.inner.instrument.on_open();
}
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use super::super::backoff;
use super::super::failure_policy::consecutive_failures;
use super::*;
#[test]
fn state_machine() {
clock::freeze(move |time| {
let observe = Observer::new();
let backoff = backoff::exponential(5.seconds(), 300.seconds());
let policy = consecutive_failures(3, backoff);
let state_machine = StateMachine::new(policy, observe.clone());
assert!(state_machine.is_call_permitted());
for _i in 0..10 {
assert!(state_machine.is_call_permitted());
state_machine.on_success();
assert!(observe.is_closed());
}
for _i in 0..2 {
assert!(state_machine.is_call_permitted());
state_machine.on_error();
assert!(observe.is_closed());
}
assert!(state_machine.is_call_permitted());
state_machine.on_error();
assert!(observe.is_open());
for i in 0..10 {
assert!(!state_machine.is_call_permitted());
assert_eq!(i + 1, observe.rejected_calls());
}
time.advance(2.seconds());
assert!(!state_machine.is_call_permitted());
assert!(observe.is_open());
clock::now();
time.advance(4.seconds());
assert!(state_machine.is_call_permitted());
assert!(observe.is_half_open());
state_machine.on_error();
assert!(!state_machine.is_call_permitted());
assert!(observe.is_open());
time.advance(5.seconds());
assert!(!state_machine.is_call_permitted());
assert!(observe.is_open());
time.advance(6.seconds());
assert!(state_machine.is_call_permitted());
assert!(observe.is_half_open());
state_machine.on_success();
assert!(state_machine.is_call_permitted());
assert!(observe.is_closed());
for _i in 0..10 {
assert!(state_machine.is_call_permitted());
state_machine.on_success();
}
for _i in 0..3 {
state_machine.on_error();
}
assert!(observe.is_open());
state_machine.reset();
assert!(observe.is_closed());
});
}
#[derive(Debug)]
enum State {
Open,
HalfOpen,
Closed,
}
#[derive(Clone, Debug)]
struct Observer {
state: Arc<Mutex<State>>,
rejected_calls: Arc<AtomicUsize>,
}
impl Observer {
fn new() -> Self {
Observer {
state: Arc::new(Mutex::new(State::Closed)),
rejected_calls: Arc::new(AtomicUsize::new(0)),
}
}
fn is_closed(&self) -> bool {
matches!(*self.state.lock().unwrap(), State::Closed)
}
fn is_open(&self) -> bool {
matches!(*self.state.lock().unwrap(), State::Open)
}
fn is_half_open(&self) -> bool {
matches!(*self.state.lock().unwrap(), State::HalfOpen)
}
fn rejected_calls(&self) -> usize {
self.rejected_calls.load(Ordering::SeqCst)
}
}
impl Instrument for Observer {
fn on_call_rejected(&self) {
self.rejected_calls.fetch_add(1, Ordering::SeqCst);
}
fn on_open(&self) {
println!("state=open");
let mut own_state = self.state.lock().unwrap();
*own_state = State::Open
}
fn on_half_open(&self) {
println!("state=half_open");
let mut own_state = self.state.lock().unwrap();
*own_state = State::HalfOpen
}
fn on_closed(&self) {
println!("state=closed");
let mut own_state = self.state.lock().unwrap();
*own_state = State::Closed
}
}
trait IntoDuration {
fn seconds(self) -> Duration;
}
impl IntoDuration for u64 {
fn seconds(self) -> Duration {
Duration::from_secs(self)
}
}
}