use crate::reactive::RwSignal;
use crate::reactive::{__reset_for_tests, flush, resource, resource_sync, Owner, ResourceState};
use crate::tasks;
fn with_test_owner<R>(f: impl FnOnce() -> R) -> R {
__reset_for_tests();
tasks::__reset_for_tests();
let owner = Owner::new(None);
owner.with(f)
}
#[test]
fn resource_sync_ready_state_for_ok_fetch() {
with_test_owner(|| {
let r = resource_sync(|| Ok::<_, String>(42_i32));
assert!(matches!(r.state(), ResourceState::Ready(42)));
assert_eq!(r.get(), Some(42));
assert!(!r.loading());
assert!(r.error().is_none());
});
}
#[test]
fn resource_sync_error_state_for_err_fetch() {
with_test_owner(|| {
let r = resource_sync(|| Err::<i32, _>("oops".to_string()));
assert!(matches!(r.state(), ResourceState::Error(_)));
assert_eq!(r.get(), None);
assert!(!r.loading());
assert_eq!(r.error().as_deref(), Some("oops"));
});
}
#[test]
fn async_resource_starts_in_loading_state() {
with_test_owner(|| {
let r = resource::<i32, _, _>(|| async { Ok(7) });
assert!(r.loading());
assert!(matches!(r.state(), ResourceState::Loading));
assert_eq!(r.get(), None);
assert!(r.error().is_none());
});
}
#[test]
fn async_resource_transitions_to_ready_after_tick() {
with_test_owner(|| {
let r = resource::<i32, _, _>(|| async { Ok(99) });
tasks::run_until_stalled();
assert!(matches!(r.state(), ResourceState::Ready(99)));
assert_eq!(r.get(), Some(99));
assert!(!r.loading());
});
}
#[test]
fn async_resource_transitions_to_error_on_err_result() {
with_test_owner(|| {
let r = resource::<i32, _, _>(|| async { Err("boom".to_string()) });
tasks::run_until_stalled();
assert!(matches!(r.state(), ResourceState::Error(_)));
assert_eq!(r.error().as_deref(), Some("boom"));
assert!(!r.loading());
});
}
#[test]
fn async_resource_with_pending_future_stays_loading() {
use std::pin::Pin;
use std::task::{Context, Poll};
struct NeverReady;
impl std::future::Future for NeverReady {
type Output = Result<i32, String>;
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Pending
}
}
with_test_owner(|| {
let r = resource::<i32, _, _>(|| NeverReady);
tasks::run_until_stalled();
assert!(r.loading(), "never-ready future must keep resource Loading");
});
}
#[test]
fn async_resource_multi_step_future_completes_within_one_tick() {
use std::pin::Pin;
use std::task::{Context, Poll};
struct OneYield(bool);
impl std::future::Future for OneYield {
type Output = Result<i32, String>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.0 {
self.0 = true;
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(Ok(123))
}
}
}
with_test_owner(|| {
let r = resource::<i32, _, _>(|| OneYield(false));
tasks::run_until_stalled();
assert_eq!(r.get(), Some(123));
});
}
#[test]
fn async_resource_tracks_signal_read_in_sync_prefix_and_refetches() {
with_test_owner(|| {
let query = RwSignal::new(String::new());
let r = resource::<String, _, _>(move || {
let q = query.get();
async move {
if q.trim().is_empty() {
Ok(String::new())
} else {
Ok(q.to_uppercase())
}
}
});
tasks::run_until_stalled();
assert_eq!(r.get().as_deref(), Some(""));
query.set("Slime".to_string());
flush(); tasks::run_until_stalled(); assert_eq!(
r.get().as_deref(),
Some("SLIME"),
"resource must re-fetch with the new query after sync-prefix read changes"
);
query.set("Goo".to_string());
flush();
tasks::run_until_stalled();
assert_eq!(r.get().as_deref(), Some("GOO"));
});
}
#[test]
fn async_resource_tracks_signal_read_after_await_and_refetches() {
use std::pin::Pin;
use std::task::{Context, Poll};
struct OneYield(bool);
impl std::future::Future for OneYield {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if !self.0 {
self.0 = true;
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(())
}
}
}
with_test_owner(|| {
let multiplier = RwSignal::new(2_i32);
let r = resource::<i32, _, _>(move || async move {
OneYield(false).await;
let m = multiplier.get();
Ok(m * 10)
});
tasks::run_until_stalled();
assert_eq!(
r.get(),
Some(20),
"first fetch should suspend, resume, read multiplier=2 → 20"
);
multiplier.set(5);
flush();
tasks::run_until_stalled();
assert_eq!(
r.get(),
Some(50),
"resource must track a signal read AFTER an .await and re-fetch on change"
);
});
}
#[test]
fn async_resource_returns_to_loading_during_refetch() {
use std::pin::Pin;
use std::task::{Context, Poll};
struct OneYield(bool);
impl std::future::Future for OneYield {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if !self.0 {
self.0 = true;
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(())
}
}
}
with_test_owner(|| {
let n = RwSignal::new(1_i32);
let r = resource::<i32, _, _>(move || {
let v = n.get();
async move {
OneYield(false).await;
Ok(v)
}
});
tasks::run_until_stalled();
assert_eq!(r.get(), Some(1));
n.set(2);
flush();
assert!(
r.loading(),
"resource should be Loading during an in-flight re-fetch"
);
tasks::run_until_stalled();
assert_eq!(r.get(), Some(2));
});
}
mod cross_thread_wake {
use super::*;
use crate::main_thread::{set_drive_callback, set_main_thread_dispatcher, DispatchFn};
use crate::tasks::run_blocking;
use std::ffi::c_void;
use std::sync::{Mutex, MutexGuard};
use std::time::{Duration, Instant};
fn lock<'a>() -> MutexGuard<'a, ()> {
crate::main_thread::host_test_lock()
}
struct Post {
callback: extern "C" fn(*mut c_void),
user_data: *mut c_void,
}
unsafe impl Send for Post {}
static POSTS: Mutex<Vec<Post>> = Mutex::new(Vec::new());
extern "C" fn request_frame_noop(_: *mut c_void) {}
extern "C" fn enqueue_post(
_engine: *mut c_void,
callback: extern "C" fn(*mut c_void),
user_data: *mut c_void,
) -> bool {
if let Ok(mut q) = POSTS.lock() {
q.push(Post {
callback,
user_data,
});
}
true
}
extern "C" fn test_drive() {
flush();
tasks::run_until_stalled();
flush();
}
fn install_host() {
if let Ok(mut q) = POSTS.lock() {
q.clear();
}
set_main_thread_dispatcher(Some(enqueue_post as DispatchFn), std::ptr::null_mut());
set_drive_callback(Some(test_drive));
crate::host_wake::set_request_frame_callback(
Some(request_frame_noop),
std::ptr::null_mut(),
);
}
fn reset_host() {
crate::main_thread::__reset_for_tests();
crate::host_wake::__reset_for_tests();
if let Ok(mut q) = POSTS.lock() {
q.clear();
}
}
fn drain_posts() -> bool {
let batch: Vec<Post> = match POSTS.lock() {
Ok(mut q) => std::mem::take(&mut *q),
Err(_) => Vec::new(),
};
let drained = !batch.is_empty();
for post in batch {
(post.callback)(post.user_data);
}
drained
}
fn tick() -> bool {
flush();
tasks::run_until_stalled();
flush();
true }
fn drive_until(mut done: impl FnMut() -> bool) {
let deadline = Instant::now() + Duration::from_secs(2);
let mut vsync_idle = false;
while !done() && Instant::now() < deadline {
if !vsync_idle {
vsync_idle = tick();
std::thread::sleep(Duration::from_millis(1));
} else {
drain_posts();
std::thread::sleep(Duration::from_millis(2));
}
}
}
#[test]
fn reactive_resource_reading_signal_with_run_blocking_completes_after_refetch() {
use std::cell::Cell;
use std::rc::Rc;
let _g = lock();
__reset_for_tests();
tasks::__reset_for_tests();
install_host();
let runs = Rc::new(Cell::new(0u32));
let runs_for_fetcher = runs.clone();
let owner = Owner::new(None);
owner.with(|| {
let query = RwSignal::new(1_i32);
let r = resource::<i32, _, _>(move || {
runs_for_fetcher.set(runs_for_fetcher.get() + 1);
let q = query.get();
async move {
let v = run_blocking(move || {
std::thread::sleep(Duration::from_millis(15));
q * 10
})
.await;
Ok::<i32, String>(v)
}
});
let seen = Rc::new(Cell::new(0i32));
let seen_for_effect = seen.clone();
crate::reactive::effect::effect(move || {
if let ResourceState::Ready(v) = r.state() {
seen_for_effect.set(v);
}
});
drive_until(|| !r.loading());
assert_eq!(
r.get(),
Some(10),
"initial reactive+run_blocking fetch must complete (query=1 → 10)"
);
query.set(7);
drive_until(|| matches!(r.state(), ResourceState::Ready(70)));
assert_eq!(
r.get(),
Some(70),
"reactive resource + run_blocking must finish the re-fetch \
after the tracked signal changes (query=7 → 70), not stay Loading"
);
assert_eq!(
runs.get(),
2,
"fetcher must run exactly twice (initial + one re-fetch); a \
spurious extra run would bump the generation and abandon the \
in-flight fetch"
);
assert_eq!(seen.get(), 70, "consumer effect observed the final value");
});
owner.dispose();
reset_host();
}
#[test]
fn reactive_resource_signal_read_after_run_blocking_await_refetches() {
let _g = lock();
__reset_for_tests();
tasks::__reset_for_tests();
install_host();
let owner = Owner::new(None);
owner.with(|| {
let multiplier = RwSignal::new(2_i32);
let r = resource::<i32, _, _>(move || async move {
let base = run_blocking(|| {
std::thread::sleep(Duration::from_millis(15));
10_i32
})
.await;
let m = multiplier.get();
Ok::<i32, String>(base * m)
});
drive_until(|| !r.loading());
assert_eq!(r.get(), Some(20), "initial: 10 * multiplier(2) = 20");
multiplier.set(5);
drive_until(|| matches!(r.state(), ResourceState::Ready(50)));
assert_eq!(
r.get(),
Some(50),
"after-await tracked read + run_blocking must re-fetch (10*5=50)"
);
});
owner.dispose();
reset_host();
}
#[test]
fn reactive_resource_overlapping_refetch_completes_latest() {
use std::cell::Cell;
use std::rc::Rc;
let _g = lock();
__reset_for_tests();
tasks::__reset_for_tests();
install_host();
let owner = Owner::new(None);
owner.with(|| {
let query = RwSignal::new(1_i32);
let r = resource::<i32, _, _>(move || {
let q = query.get();
async move {
let v = run_blocking(move || {
std::thread::sleep(Duration::from_millis(40));
q * 10
})
.await;
Ok::<i32, String>(v)
}
});
let seen = Rc::new(Cell::new(0i32));
let seen_for_effect = seen.clone();
crate::reactive::effect::effect(move || {
if let ResourceState::Ready(v) = r.state() {
seen_for_effect.set(v);
}
});
tick();
assert!(r.loading());
query.set(7);
drive_until(|| matches!(r.state(), ResourceState::Ready(70)));
assert_eq!(
r.get(),
Some(70),
"overlapping re-fetch must settle on the latest query (7 → 70), \
not hang in Loading"
);
});
owner.dispose();
reset_host();
}
}
#[test]
fn resource_state_helpers_match_active_branch() {
let loading: ResourceState<i32> = ResourceState::Loading;
assert!(loading.is_loading());
assert!(!loading.is_ready());
assert!(!loading.is_error());
let ready: ResourceState<i32> = ResourceState::Ready(1);
assert!(!ready.is_loading());
assert!(ready.is_ready());
assert!(!ready.is_error());
let err: ResourceState<i32> = ResourceState::Error("x".into());
assert!(!err.is_loading());
assert!(!err.is_ready());
assert!(err.is_error());
}