use std::time::Duration;
use futures::Stream;
use futures::StreamExt;
use futures::stream;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
pub(crate) mod reload;
pub(crate) use reload::ReloadState;
use crate::router::Event;
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
#[serde(deny_unknown_fields)]
#[serde(default)]
pub(crate) struct Config {
#[serde(with = "humantime_serde")]
#[schemars(with = "Option<String>")]
pub(crate) force_schema_reload: Option<Duration>,
#[serde(with = "humantime_serde")]
#[schemars(with = "Option<String>")]
pub(crate) force_config_reload: Option<Duration>,
}
impl Config {
#[cfg(test)]
fn new(force_schema_reload: Option<Duration>, force_config_reload: Option<Duration>) -> Self {
Self {
force_schema_reload,
force_config_reload,
}
}
}
pub(crate) trait ChaosEventStream: Stream<Item = Event> + Sized {
fn with_chaos_reload_state(self, reload_source: ReloadState) -> impl Stream<Item = Event> {
let reload_source_for_events = reload_source.clone();
let watched_upstream = self.map(move |event| {
match &event {
Event::UpdateSchema(schema_state) => {
reload_source_for_events.update_last_schema(schema_state);
}
Event::UpdateConfiguration(config) => {
reload_source_for_events.set_periods(&config.experimental_chaos);
reload_source_for_events.update_last_configuration(config);
}
_ => {}
}
event
});
stream::select(watched_upstream, reload_source.into_stream())
}
fn with_chaos_reload(self) -> impl Stream<Item = Event> {
self.with_chaos_reload_state(ReloadState::default())
}
}
impl<S> ChaosEventStream for S where S: Stream<Item = Event> {}