use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
use futures::prelude::*;
use parking_lot::Mutex;
use tokio_util::time::DelayQueue;
use super::Config;
use crate::configuration::Configuration;
use crate::router::Event;
use crate::uplink::schema::SchemaState;
#[derive(Clone)]
enum ChaosEvent {
Schema,
Configuration,
}
#[derive(Default)]
struct ReloadStateInner {
queue: DelayQueue<ChaosEvent>,
force_schema_reload_period: Option<Duration>,
force_config_reload_period: Option<Duration>,
last_schema: Option<SchemaState>,
last_configuration: Option<Arc<Configuration>>,
}
#[derive(Clone, Default)]
pub(crate) struct ReloadState {
inner: Arc<Mutex<ReloadStateInner>>,
}
impl ReloadState {
pub(crate) fn set_periods(&self, config: &Config) {
let mut inner = self.inner.lock();
inner.queue.clear();
inner.force_schema_reload_period = config.force_schema_reload;
inner.force_config_reload_period = config.force_config_reload;
if let Some(period) = config.force_schema_reload {
inner.queue.insert(ChaosEvent::Schema, period);
}
if let Some(period) = config.force_config_reload {
inner.queue.insert(ChaosEvent::Configuration, period);
}
}
pub(crate) fn update_last_schema(&self, schema: &SchemaState) {
let mut inner = self.inner.lock();
inner.last_schema = Some(schema.clone());
}
pub(crate) fn update_last_configuration(&self, config: &Arc<Configuration>) {
let mut inner = self.inner.lock();
inner.last_configuration = Some(config.clone());
}
pub(crate) fn into_stream(self) -> impl Stream<Item = Event> {
futures::stream::poll_fn(move |cx| {
let mut inner = self.inner.lock();
match inner.queue.poll_expired(cx) {
Poll::Ready(Some(expired)) => {
let event_type = expired.into_inner();
match &event_type {
ChaosEvent::Schema => {
if let Some(period) = inner.force_schema_reload_period {
inner.queue.insert(ChaosEvent::Schema, period);
}
}
ChaosEvent::Configuration => {
if let Some(period) = inner.force_config_reload_period {
inner.queue.insert(ChaosEvent::Configuration, period);
}
}
}
let event = match event_type {
ChaosEvent::Schema => {
if let Some(mut schema) = inner.last_schema.clone() {
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
schema.sdl = format!(
"# Chaos reload timestamp: {}\n{}",
timestamp, schema.sdl
);
Some(Event::UpdateSchema(schema))
} else {
None
}
}
ChaosEvent::Configuration => {
if let Some(config) = inner.last_configuration.clone() {
let config_clone = (*config).clone();
Some(Event::UpdateConfiguration(Arc::new(config_clone)))
} else {
None
}
}
};
match event {
Some(event) => Poll::Ready(Some(event)),
None => Poll::Pending, }
}
Poll::Ready(None) => Poll::Pending,
Poll::Pending => Poll::Pending,
}
})
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use futures::StreamExt;
use futures::pin_mut;
use super::*;
use crate::configuration::Configuration;
use crate::plugins::chaos::ChaosEventStream;
fn create_test_schema() -> SchemaState {
SchemaState {
sdl: "type Query { hello: String }".to_string(),
launch_id: Some("test-launch".to_string()),
}
}
fn create_test_config() -> Arc<Configuration> {
Arc::new(Configuration::default())
}
#[test]
fn test_reload_source_default() {
let source = ReloadState::default();
let inner = source.inner.lock();
assert!(inner.force_schema_reload_period.is_none());
assert!(inner.force_config_reload_period.is_none());
assert!(inner.last_schema.is_none());
assert!(inner.last_configuration.is_none());
}
#[tokio::test]
async fn test_set_periods_configures_reload_intervals() {
let source = ReloadState::default();
let schema_period = Duration::from_secs(5);
let config_period = Duration::from_secs(10);
let chaos_config = Config::new(Some(schema_period), Some(config_period));
source.set_periods(&chaos_config);
let inner = source.inner.lock();
assert_eq!(inner.force_schema_reload_period, Some(schema_period));
assert_eq!(inner.force_config_reload_period, Some(config_period));
}
#[tokio::test]
async fn test_set_periods_clears_existing_queue() {
let source = ReloadState::default();
let chaos_config1 = Config::new(Some(Duration::from_secs(5)), None);
source.set_periods(&chaos_config1);
assert_eq!(source.inner.lock().queue.len(), 1);
let chaos_config2 = Config::new(None, Some(Duration::from_secs(10)));
source.set_periods(&chaos_config2);
let inner = source.inner.lock();
assert_eq!(inner.force_schema_reload_period, None);
assert_eq!(
inner.force_config_reload_period,
Some(Duration::from_secs(10))
);
assert_eq!(inner.queue.len(), 1);
}
#[test]
fn test_set_periods_with_none_values() {
let source = ReloadState::default();
let chaos_config = Config::new(None, None);
source.set_periods(&chaos_config);
let inner = source.inner.lock();
assert!(inner.force_schema_reload_period.is_none());
assert!(inner.force_config_reload_period.is_none());
assert!(inner.queue.is_empty());
}
#[test]
fn test_update_last_schema() {
let source = ReloadState::default();
let schema = create_test_schema();
{
let inner = source.inner.lock();
assert!(inner.last_schema.is_none());
}
source.update_last_schema(&schema);
let inner = source.inner.lock();
let stored_schema = inner.last_schema.as_ref().unwrap();
assert_eq!(stored_schema.sdl, schema.sdl);
assert_eq!(stored_schema.launch_id, schema.launch_id);
}
#[test]
fn test_update_last_configuration() {
let source = ReloadState::default();
let config = create_test_config();
{
let inner = source.inner.lock();
assert!(inner.last_configuration.is_none());
}
source.update_last_configuration(&config);
let inner = source.inner.lock();
let stored_config = inner.last_configuration.as_ref().unwrap();
assert!(Arc::ptr_eq(stored_config, &config));
}
#[test]
fn test_update_methods_replace_previous_values() {
let source = ReloadState::default();
let schema1 = create_test_schema();
let config1 = create_test_config();
source.update_last_schema(&schema1);
source.update_last_configuration(&config1);
let mut schema2 = create_test_schema();
schema2.sdl = "type Query { goodbye: String }".to_string();
schema2.launch_id = Some("new-launch".to_string());
let config2 = Arc::new(Configuration::default());
source.update_last_schema(&schema2);
source.update_last_configuration(&config2);
let inner = source.inner.lock();
let stored_schema = inner.last_schema.as_ref().unwrap();
assert_eq!(stored_schema.sdl, schema2.sdl);
assert_eq!(stored_schema.launch_id, schema2.launch_id);
let stored_config = inner.last_configuration.as_ref().unwrap();
assert!(Arc::ptr_eq(stored_config, &config2));
assert!(!Arc::ptr_eq(stored_config, &config1));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_stream_with_no_chaos_periods_is_empty() {
let source = ReloadState::default();
let mut stream = source.into_stream();
let result = tokio::time::timeout(Duration::from_millis(50), stream.next()).await;
assert!(result.is_err()); }
#[tokio::test(flavor = "multi_thread")]
async fn test_stream_skips_events_when_no_data_available() {
let source = ReloadState::default();
let chaos_config = Config::new(Some(Duration::from_millis(10)), None);
source.set_periods(&chaos_config);
let mut stream = source.into_stream();
let result = tokio::time::timeout(Duration::from_millis(50), stream.next()).await;
assert!(result.is_err()); }
#[tokio::test(flavor = "multi_thread")]
async fn test_schema_reload_stream_generates_events() {
let source = ReloadState::default();
let schema = create_test_schema();
source.update_last_schema(&schema);
let chaos_config = Config::new(Some(Duration::from_millis(10)), None);
source.set_periods(&chaos_config);
let mut stream = source.into_stream();
let event = tokio::time::timeout(Duration::from_millis(100), stream.next())
.await
.expect("Should receive event within timeout")
.expect("Stream should produce an event");
match event {
Event::UpdateSchema(reloaded_schema) => {
assert!(reloaded_schema.sdl.contains("# Chaos reload timestamp:"));
assert!(reloaded_schema.sdl.contains(&schema.sdl));
assert_eq!(reloaded_schema.launch_id, schema.launch_id);
}
_ => panic!("Expected UpdateSchema event, got {:?}", event),
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_config_reload_stream_generates_events() {
let source = ReloadState::default();
let config = create_test_config();
source.update_last_configuration(&config);
let chaos_config = Config::new(None, Some(Duration::from_millis(10)));
source.set_periods(&chaos_config);
let mut stream = source.into_stream();
let event = tokio::time::timeout(Duration::from_millis(100), stream.next())
.await
.expect("Should receive event within timeout")
.expect("Stream should produce an event");
match event {
Event::UpdateConfiguration(reloaded_config) => {
assert!(!Arc::ptr_eq(&reloaded_config, &config));
}
_ => panic!("Expected UpdateConfiguration event, got {:?}", event),
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_reloadable_event_stream_captures_schema_events() {
let reload_source = ReloadState::default();
let schema = create_test_schema();
let schema_event = Event::UpdateSchema(schema.clone());
let upstream = futures::stream::once(async move { schema_event });
let stream = upstream.with_chaos_reload_state(reload_source.clone());
pin_mut!(stream);
let event = stream.next().await.unwrap();
match event {
Event::UpdateSchema(received_schema) => {
assert_eq!(received_schema.sdl, schema.sdl);
assert_eq!(received_schema.launch_id, schema.launch_id);
}
_ => panic!("Expected UpdateSchema event"),
}
let inner = reload_source.inner.lock();
let stored_schema = inner.last_schema.as_ref().unwrap();
assert_eq!(stored_schema.sdl, schema.sdl);
assert_eq!(stored_schema.launch_id, schema.launch_id);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_reloadable_event_stream_configures_and_captures_config_events() {
let reload_source = ReloadState::default();
let config = Configuration {
experimental_chaos: Config::new(Some(Duration::from_secs(30)), None),
..Default::default()
};
let config_arc = Arc::new(config);
let config_event = Event::UpdateConfiguration(config_arc.clone());
let upstream = futures::stream::once(async move { config_event });
let stream = upstream.with_chaos_reload_state(reload_source.clone());
pin_mut!(stream);
let event = stream.next().await.unwrap();
match event {
Event::UpdateConfiguration(received_config) => {
assert!(Arc::ptr_eq(&received_config, &config_arc));
}
_ => panic!("Expected UpdateConfiguration event"),
}
let inner = reload_source.inner.lock();
let stored_config = inner.last_configuration.as_ref().unwrap();
assert!(Arc::ptr_eq(stored_config, &config_arc));
assert_eq!(
inner.force_schema_reload_period,
Some(Duration::from_secs(30))
);
assert!(inner.force_config_reload_period.is_none());
}
#[tokio::test(flavor = "multi_thread")]
async fn test_reloadable_event_stream_ignores_other_events() {
let reload_source = ReloadState::default();
let other_event = Event::NoMoreSchema;
let upstream = futures::stream::once(async move { other_event });
let stream = upstream.with_chaos_reload_state(reload_source.clone());
pin_mut!(stream);
let event = stream.next().await.unwrap();
match event {
Event::NoMoreSchema => {
}
_ => panic!("Expected NoMoreSchema event"),
}
let inner = reload_source.inner.lock();
assert!(inner.last_schema.is_none());
assert!(inner.last_configuration.is_none());
assert!(inner.force_schema_reload_period.is_none());
assert!(inner.force_config_reload_period.is_none());
}
#[tokio::test(flavor = "multi_thread")]
async fn test_reloadable_event_stream_merges_upstream_and_chaos_events() {
let reload_source = ReloadState::default();
let schema = create_test_schema();
reload_source.update_last_schema(&schema);
let chaos_config = Config::new(Some(Duration::from_millis(10)), None);
reload_source.set_periods(&chaos_config);
let schema_event = Event::UpdateSchema(schema.clone());
let upstream = futures::stream::once(async move { schema_event });
let stream = upstream.with_chaos_reload_state(reload_source);
pin_mut!(stream);
let first_event = stream.next().await.unwrap();
match first_event {
Event::UpdateSchema(received_schema) => {
assert_eq!(received_schema.sdl, schema.sdl);
assert!(!received_schema.sdl.contains("# Chaos reload timestamp:"));
}
_ => panic!("Expected UpdateSchema event"),
}
let second_event = tokio::time::timeout(Duration::from_millis(100), stream.next())
.await
.expect("Should receive chaos event within timeout")
.expect("Stream should produce an event");
match second_event {
Event::UpdateSchema(chaos_schema) => {
assert!(chaos_schema.sdl.contains("# Chaos reload timestamp:"));
assert!(chaos_schema.sdl.contains(&schema.sdl));
}
_ => panic!("Expected UpdateSchema chaos event"),
}
}
}