use crate::async_error::AsyncError;
use crate::unit_tests::TestState;
use crate::{Async, SignalExt, StateStore};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tokio_util::sync::CancellationToken;
#[tokio::test]
async fn test_execute() {
let store = StateStore::new(TestState::default());
store.execute(
|| "Hello, World!".to_string(),
|state, async_data| state.set_async_data(async_data),
);
let mut state_vec = Vec::new();
store
.to_signal()
.stop_if(|state| state.data.is_complete())
.for_each(|state| {
state_vec.push(state.data);
async {}
})
.await;
assert_eq!(state_vec[0], Async::Uninitialized);
assert_eq!(state_vec[1], Async::loading(None));
assert_eq!(state_vec[2], Async::success("Hello, World!".to_string()));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_execute_with_computation_join_error() {
let store = StateStore::new(TestState::default());
store.execute(
|| "Operation 1 success".to_string(),
|state, async_data| state.set_async_data(async_data),
);
tokio::time::sleep(Duration::from_secs(1)).await;
store.execute(
|| Err("Operation 2 fail"),
|state, async_data| state.set_async_data(async_data),
);
store
.to_signal()
.stop_if(|state| state.data.is_complete())
.for_each(|state| {
if state.data.is_complete() {
assert_eq!(
state.data,
Async::Success {
value: "Operation 1 success".to_string()
}
);
}
async {}
})
.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_execute_cancelable_with_computation_join_error() {
let store = StateStore::new(TestState::default());
let token = CancellationToken::new();
store.execute_cancellable(
token.clone(),
|_| "Operation 1 success".to_string(),
|state, async_data| state.set_async_data(async_data),
);
tokio::time::sleep(Duration::from_secs(1)).await;
store.execute_cancellable(
token.clone(),
|_| Err("Operation 2 fail"),
|state, async_data| state.set_async_data(async_data),
);
store
.to_signal()
.stop_if(|state| state.data.is_complete())
.for_each(|state| {
if state.data.is_complete() {
assert_eq!(
state.data,
Async::Success {
value: "Operation 1 success".to_string()
}
);
}
async {}
})
.await;
}
#[tokio::test]
async fn test_execute_with_error() {
let store = StateStore::new(TestState::default());
store.execute(
|| Err("Operation failed"),
|state, async_data| state.set_async_data(async_data),
);
let mut state_vec = Vec::new();
store
.to_signal()
.stop_if(|state| state.data.is_complete())
.for_each(|state| {
state_vec.push(state.data);
async {}
})
.await;
assert_eq!(state_vec[0], Async::Uninitialized);
assert_eq!(state_vec[1], Async::loading(None));
assert_eq!(
state_vec[2],
Async::fail(AsyncError::error("Operation failed"), None,)
);
}
#[tokio::test]
async fn test_execute_with_none() {
let store = StateStore::new(TestState::default());
store.execute(
|| None,
|state, async_data| state.set_async_data(async_data),
);
let mut state_vec = Vec::new();
store
.to_signal()
.stop_if(|state| state.data.is_complete())
.for_each(|state| {
state_vec.push(state.data);
async {}
})
.await;
assert_eq!(state_vec[0], Async::Uninitialized);
assert_eq!(state_vec[1], Async::loading(None));
assert_eq!(state_vec[2], Async::fail_with_none(None));
}
#[tokio::test]
async fn test_execute_with_retain_success() {
let initial_state = TestState::default().set_async_data(Async::success("initial".to_string()));
let store = StateStore::new(initial_state);
store.execute_with_retain(
|| Result::<String, &str>::Ok("success".to_string()),
|state| &state.data,
|state, async_data| state.set_async_data(async_data),
);
let state_vec = Arc::new(RwLock::new(Vec::new()));
store
.to_signal()
.stop_if(|_| {
let len = state_vec.read().unwrap().len();
len >= 2
})
.for_each(|state| {
state_vec.write().unwrap().push(state.data);
async {}
})
.await;
let state_vec = state_vec
.read()
.unwrap()
.iter()
.cloned()
.collect::<Vec<_>>();
assert_eq!(state_vec[0], Async::success("initial".to_string()));
assert_eq!(state_vec[1], Async::loading(Some("initial".to_string())));
assert_eq!(state_vec[2], Async::success("success".to_string()));
}
#[tokio::test]
async fn test_execute_with_retain_fail() {
let initial_state = TestState::default().set_async_data(Async::success("initial".to_string()));
let store = StateStore::new(initial_state);
store.execute_with_retain(
|| Err("Operation failed"),
|state| &state.data,
|state, async_data| state.set_async_data(async_data),
);
let state_vec = Arc::new(RwLock::new(Vec::new()));
store
.to_signal()
.stop_if(|_| {
let len = state_vec.read().unwrap().len();
len >= 2
})
.for_each(|state| {
state_vec.write().unwrap().push(state.data);
async {}
})
.await;
let state_vec = state_vec
.read()
.unwrap()
.iter()
.cloned()
.collect::<Vec<_>>();
assert_eq!(state_vec[0], Async::success("initial".to_string()));
assert_eq!(state_vec[1], Async::loading(Some("initial".to_string())));
assert_eq!(
state_vec[2],
Async::fail_with_message("Operation failed", Some("initial".to_string()))
);
}
#[tokio::test]
async fn test_execute_cancellable_success() {
let store = StateStore::new(TestState::default());
let token = CancellationToken::new();
store.execute_cancellable(
token,
|_| {
"success".to_string()
},
|state, async_data| state.set_async_data(async_data),
);
let mut state_vec = Vec::new();
store
.to_signal()
.stop_if(|state| state.data.is_complete())
.for_each(|state| {
state_vec.push(state.data);
async {}
})
.await;
assert_eq!(state_vec[0], Async::Uninitialized);
assert_eq!(state_vec[1], Async::loading(None));
assert_eq!(state_vec[2], Async::success("success".to_string()));
}
#[tokio::test]
async fn test_execute_cancellable_cancel_inner() {
let store = StateStore::new(TestState::default());
let token = CancellationToken::new();
store.execute_cancellable(
token.clone(),
|token| {
token.cancel();
"Result".to_string()
},
|state, async_data| state.set_async_data(async_data),
);
let mut state_vec = Vec::new();
store
.to_signal()
.stop_if(|state| state.data.is_complete())
.for_each(|state| {
state_vec.push(state.data);
async {}
})
.await;
assert_eq!(state_vec[0], Async::Uninitialized);
assert_eq!(state_vec[1], Async::loading(None));
assert_eq!(state_vec[2], Async::fail_with_cancelled(None));
}
#[tokio::test]
async fn test_execute_cancellable_cancel_outer() {
let store = StateStore::new(TestState::default());
let token = CancellationToken::new();
store.execute_cancellable(
token.clone(),
|_| "Result".to_string(),
|state, async_data| state.set_async_data(async_data),
);
token.cancel();
let mut state_vec = Vec::new();
store
.to_signal()
.stop_if(|state| state.data.is_complete())
.for_each(|state| {
state_vec.push(state.data);
async {}
})
.await;
assert_eq!(state_vec[0], Async::Uninitialized);
assert_eq!(state_vec[1], Async::loading(None));
assert_eq!(state_vec[2], Async::fail_with_cancelled(None));
}
#[tokio::test]
async fn test_execute_cancellable_with_retain_success() {
let initial_state = TestState::default().set_async_data(Async::success("initial".to_string()));
let store = StateStore::new(initial_state);
let token = CancellationToken::new();
store.execute_cancellable_with_retain(
token.clone(),
|_| {
"success".to_string()
},
|state| &state.data,
|state, async_data| state.set_async_data(async_data),
);
let state_vec = Arc::new(RwLock::new(Vec::new()));
store
.to_signal()
.stop_if(|_| {
let len = state_vec.read().unwrap().len();
len >= 2
})
.for_each(|state| {
state_vec.write().unwrap().push(state.data);
async {}
})
.await;
let state_vec = state_vec
.read()
.unwrap()
.iter()
.cloned()
.collect::<Vec<_>>();
assert_eq!(state_vec[0], Async::success("initial".to_string()));
assert_eq!(state_vec[1], Async::loading(Some("initial".to_string())));
assert_eq!(state_vec[2], Async::success("success".to_string()));
}
#[tokio::test]
async fn test_execute_cancellable_with_retain_fail() {
let initial_state = TestState::default().set_async_data(Async::success("initial".to_string()));
let store = StateStore::new(initial_state);
let token = CancellationToken::new();
store.execute_cancellable_with_retain(
token.clone(),
|_| {
Err("Result".to_string())
},
|state| &state.data,
|state, async_data| state.set_async_data(async_data),
);
let state_vec = Arc::new(RwLock::new(Vec::new()));
store
.to_signal()
.stop_if(|_| {
let len = state_vec.read().unwrap().len();
len >= 2
})
.for_each(|state| {
state_vec.write().unwrap().push(state.data);
async {}
})
.await;
let state_vec = state_vec
.read()
.unwrap()
.iter()
.cloned()
.collect::<Vec<_>>();
assert_eq!(state_vec[0], Async::success("initial".to_string()));
assert_eq!(state_vec[1], Async::loading(Some("initial".to_string())));
assert_eq!(
state_vec[2],
Async::fail_with_message("Result".to_string(), Some("initial".to_string()))
);
}
#[tokio::test]
async fn test_execute_cancellable_with_retain_cancel() {
let initial_state = TestState::default().set_async_data(Async::success("initial".to_string()));
let store = StateStore::new(initial_state);
let token = CancellationToken::new();
store.execute_cancellable_with_retain(
token.clone(),
|token| {
token.cancel();
"Result".to_string()
},
|state| &state.data,
|state, async_data| state.set_async_data(async_data),
);
let state_vec = Arc::new(RwLock::new(Vec::new()));
store
.to_signal()
.stop_if(|_| {
let len = state_vec.read().unwrap().len();
len >= 2
})
.for_each(|state| {
state_vec.write().unwrap().push(state.data);
async {}
})
.await;
let state_vec = state_vec
.read()
.unwrap()
.iter()
.cloned()
.collect::<Vec<_>>();
assert_eq!(state_vec[0], Async::success("initial".to_string()));
assert_eq!(state_vec[1], Async::loading(Some("initial".to_string())));
assert_eq!(
state_vec[2],
Async::fail_with_cancelled(Some("initial".to_string()))
);
}
#[tokio::test]
async fn test_execute_with_timeout_success() {
let store = StateStore::new(TestState::default());
store.execute_with_timeout(
|| {
std::thread::sleep(std::time::Duration::from_millis(10));
"Delayed Result".to_string()
},
Duration::from_millis(50),
|state, async_data| state.set_async_data(async_data),
);
let mut state_vec = Vec::new();
store
.to_signal()
.stop_if(|state| state.data.is_complete())
.for_each(|state| {
state_vec.push(state.data);
async {}
})
.await;
assert_eq!(state_vec[0], Async::Uninitialized);
assert_eq!(state_vec[1], Async::loading(None));
assert_eq!(state_vec[2], Async::success("Delayed Result".to_string()));
}
#[tokio::test]
async fn test_execute_with_timeout_fail() {
let store = StateStore::new(TestState::default());
store.execute_with_timeout(
|| {
std::thread::sleep(std::time::Duration::from_millis(10));
Err("fail".to_string())
},
Duration::from_millis(50),
|state, async_data| state.set_async_data(async_data),
);
let mut state_vec = Vec::new();
store
.to_signal()
.stop_if(|state| state.data.is_complete())
.for_each(|state| {
state_vec.push(state.data);
async {}
})
.await;
assert_eq!(state_vec[0], Async::Uninitialized);
assert_eq!(state_vec[1], Async::loading(None));
assert_eq!(
state_vec[2],
Async::fail_with_message("fail".to_string(), None)
);
}
#[tokio::test]
async fn test_execute_with_timeout() {
let store = StateStore::new(TestState::default());
store.execute_with_timeout(
|| {
std::thread::sleep(std::time::Duration::from_millis(50));
"Delayed Result".to_string()
},
Duration::from_millis(10),
|state, async_data| state.set_async_data(async_data),
);
let mut state_vec = Vec::new();
store
.to_signal()
.stop_if(|state| state.data.is_complete())
.for_each(|state| {
state_vec.push(state.data);
async {}
})
.await;
assert_eq!(state_vec[0], Async::Uninitialized);
assert_eq!(state_vec[1], Async::loading(None));
assert_eq!(state_vec[2], Async::fail_with_timeout(None));
}