use crate::handler::{Error, FinalizeResponse, Handler, SyncRequest, SyncResponse};
use serde_json::Value;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct BackoffConfig {
pub initial_interval: Duration,
pub max_interval: Duration,
pub give_up_after: Option<Duration>,
pub multiplier: f64,
pub randomization_factor: f64,
}
impl Default for BackoffConfig {
fn default() -> BackoffConfig {
BackoffConfig {
initial_interval: Duration::from_millis(500),
max_interval: Duration::from_secs(600),
give_up_after: None,
multiplier: 1.5,
randomization_factor: 0.5,
}
}
}
impl BackoffConfig {
pub fn fixed_interval(interval: Duration) -> BackoffConfig {
BackoffConfig {
initial_interval: interval,
max_interval: interval,
give_up_after: None,
multiplier: 1.0,
randomization_factor: 0.0,
}
}
pub fn never_retry() -> BackoffConfig {
BackoffConfig {
initial_interval: Duration::from_millis(500),
give_up_after: Some(Duration::from_millis(0)),
randomization_factor: 0.0,
..Default::default()
}
}
pub fn disable_randomization(mut self) -> Self {
self.randomization_factor = 0.0;
self
}
fn new_backoff(&self) -> backoff::ExponentialBackoff {
let start_time = if Some(Duration::from_millis(0)) == self.give_up_after {
std::time::Instant::now() - Duration::from_millis(500)
} else {
std::time::Instant::now()
};
backoff::ExponentialBackoff {
initial_interval: self.initial_interval,
current_interval: self.initial_interval,
max_interval: self.max_interval,
multiplier: self.multiplier,
max_elapsed_time: self.give_up_after,
randomization_factor: self.randomization_factor,
start_time,
..Default::default()
}
}
}
#[derive(Debug, Default)]
pub struct ErrorBackoff {
backoff_state: Arc<Mutex<HashMap<String, backoff::ExponentialBackoff>>>,
backoff_config: BackoffConfig,
}
impl ErrorBackoff {
pub fn new(backoff_config: BackoffConfig) -> ErrorBackoff {
ErrorBackoff {
backoff_config,
backoff_state: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn next_error_backoff(&self, req: &SyncRequest) -> Option<Duration> {
use backoff::backoff::Backoff;
let ErrorBackoff {
ref backoff_state,
ref backoff_config,
} = *self;
let uid = req.parent.uid();
let mut backoffs = backoff_state.lock().unwrap();
if !backoffs.contains_key(uid) {
let backoff = backoff_config.new_backoff();
backoffs.insert(uid.to_owned(), backoff);
}
let bo = backoffs.get_mut(uid).unwrap();
bo.next_backoff()
}
pub fn reset_backoff(&self, req: &SyncRequest) {
let uid = req.parent.uid();
let mut backoffs = self.backoff_state.lock().unwrap();
backoffs.remove(uid);
}
}
#[derive(Debug, PartialEq, Clone)]
pub enum HandlerResult<V, E> {
ValidationFailed(E),
SyncFailed(V, E),
FinalizeFailed(E),
SyncSuccess(V),
FinalizeSuccess,
}
impl<V, E> HandlerResult<V, E> {
pub fn is_success(&self) -> bool {
match self {
HandlerResult::SyncSuccess(_) => true,
HandlerResult::FinalizeSuccess => true,
_ => false,
}
}
pub fn is_error(&self) -> bool {
!self.is_success()
}
pub fn into_error(self) -> Option<E> {
match self {
HandlerResult::ValidationFailed(e) => Some(e),
HandlerResult::SyncFailed(_, e) => Some(e),
HandlerResult::FinalizeFailed(e) => Some(e),
_ => None,
}
}
pub fn into_validated(self) -> Option<V> {
match self {
HandlerResult::SyncFailed(v, _) => Some(v),
HandlerResult::SyncSuccess(v) => Some(v),
_ => None,
}
}
}
pub trait FailableHandler: Send + Sync + 'static {
type Validated;
type Error: Debug;
type Status: serde::Serialize;
fn validate(&self, request: &SyncRequest) -> Result<Self::Validated, Self::Error>;
fn sync_children(
&self,
validated: &mut Self::Validated,
req: &SyncRequest,
) -> Result<Vec<Value>, Self::Error>;
fn finalize(&self, _req: &SyncRequest) -> Result<(), Self::Error> {
Ok(())
}
fn determine_status(
&self,
req: &SyncRequest,
result: HandlerResult<Self::Validated, Self::Error>,
) -> Self::Status;
}
impl<Syncf, Sf, Status, E> FailableHandler for (Syncf, Sf)
where
E: Debug,
Status: serde::Serialize + serde::de::DeserializeOwned,
Syncf: Fn(&SyncRequest) -> Result<Vec<Value>, E> + Send + Sync + 'static,
Sf: Fn(&SyncRequest, Option<E>) -> Status + Send + Sync + 'static,
{
type Validated = ();
type Error = E;
type Status = Status;
fn validate(&self, _request: &SyncRequest) -> Result<Self::Validated, Self::Error> {
Ok(())
}
fn sync_children(
&self,
_: &mut Self::Validated,
req: &SyncRequest,
) -> Result<Vec<Value>, Self::Error> {
(self.0)(req)
}
fn determine_status(&self, req: &SyncRequest, result: HandlerResult<(), E>) -> Self::Status {
let error = match result {
HandlerResult::ValidationFailed(e) => Some(e),
HandlerResult::SyncFailed(_, e) => Some(e),
_ => None,
};
(self.1)(req, error)
}
}
pub struct DefaultFailableHandler<H: FailableHandler> {
inner: H,
error_backoff: ErrorBackoff,
regular_resync: Option<Duration>,
}
impl<F: FailableHandler> DefaultFailableHandler<F> {
pub fn wrap(failable: F) -> DefaultFailableHandler<F> {
DefaultFailableHandler::new(failable, BackoffConfig::default(), None)
}
pub fn new(
failable: F,
backoff_config: BackoffConfig,
regular_resync: Option<Duration>,
) -> DefaultFailableHandler<F> {
DefaultFailableHandler {
inner: failable,
error_backoff: ErrorBackoff::new(backoff_config),
regular_resync,
}
}
pub fn with_regular_resync(mut self, resync_interval: Duration) -> Self {
self.regular_resync = Some(resync_interval);
self
}
pub fn with_backoff(mut self, backoff_config: BackoffConfig) -> Self {
self.error_backoff.backoff_config = backoff_config;
self
}
}
impl<H: FailableHandler> Handler for DefaultFailableHandler<H> {
fn sync(&self, request: &SyncRequest) -> Result<SyncResponse, Error> {
let mut children = Vec::new();
let sync_result = match self.inner.validate(request) {
Ok(mut validated) => {
log::debug!(
"validation succeeded for parent: {}",
request.parent.get_object_id()
);
match self.inner.sync_children(&mut validated, request) {
Ok(kids) => {
log::debug!("sync_children succeeded for parent: {} and returned {} child resources",
request.parent.get_object_id(),
kids.len());
children = kids;
HandlerResult::SyncSuccess(validated)
}
Err(err) => HandlerResult::SyncFailed(validated, err),
}
}
Err(validation_err) => HandlerResult::ValidationFailed(validation_err),
};
let resync = if sync_result.is_error() {
self.error_backoff.next_error_backoff(request)
} else {
self.error_backoff.reset_backoff(request);
self.regular_resync
};
let status = self.inner.determine_status(request, sync_result);
let status_json = serde_json::to_value(status).map_err(|err| {
log::error!(
"Failed to serialize status of parent: {}, err: {:?}",
request.parent.get_object_id(),
err
);
Error::from(err)
})?;
Ok(SyncResponse {
resync,
children,
status: status_json,
})
}
fn finalize(&self, request: &SyncRequest) -> Result<FinalizeResponse, Error> {
let result = self
.inner
.finalize(request)
.err()
.map(HandlerResult::FinalizeFailed)
.unwrap_or(HandlerResult::FinalizeSuccess);
let retry = if result.is_error() {
self.error_backoff.next_error_backoff(request)
} else {
self.error_backoff.reset_backoff(request);
None
};
let status_struct = self.inner.determine_status(request, result);
let status = serde_json::to_value(status_struct).map_err(|err| {
log::error!(
"Failed to serialize status of parent: {}, err: {:?}",
request.parent.get_object_id(),
err
);
Error::from(err)
})?;
Ok(FinalizeResponse { status, retry })
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::handler::request::{test_request, SyncRequest};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
struct TestStatus {
error: Option<String>,
}
#[derive(Debug)]
struct TestError;
#[test]
fn backoff_is_reset_after_successful_sync() {
let return_error = Arc::new(AtomicBool::new(true));
let error_control = return_error.clone();
let status_error = return_error.clone();
let sync_fun = move |_req: &SyncRequest| {
if return_error.load(Ordering::SeqCst) {
Err(TestError)
} else {
Ok(Vec::<Value>::new())
}
};
let status_fun = move |_req: &SyncRequest, err: Option<TestError>| {
let expected_error = status_error.load(Ordering::SeqCst);
assert_eq!(expected_error, err.is_some());
let error = err.map(|_| "omg there was an error".to_owned());
TestStatus { error }
};
let backoff_config = BackoffConfig::default().disable_randomization();
let handler = DefaultFailableHandler::wrap((sync_fun, status_fun))
.with_backoff(backoff_config.clone());
let request = &test_request();
for i in 0..5 {
let resp = handler.sync(request).expect("handler returned err");
assert!(resp.resync.is_some());
if i == 0 {
assert_eq!(backoff_config.initial_interval, resp.resync.unwrap());
} else {
assert!(resp.resync.unwrap() > backoff_config.initial_interval);
}
assert!(resp.children.is_empty());
let expected_status = serde_json::json!({
"error": "omg there was an error"
});
assert_eq!(expected_status, resp.status);
}
error_control.store(false, Ordering::SeqCst);
let resp = handler.sync(request).expect("handler returned an error");
assert!(resp.resync.is_none());
error_control.store(true, Ordering::SeqCst);
let resp = handler.sync(request).expect("handler returned an error");
assert_eq!(Some(backoff_config.initial_interval), resp.resync);
}
}