use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::{Duration, Instant};
use parking_lot::{Mutex, RwLock};
use tokio::task::JoinHandle;
use crate::fiber::FiberId;
use crate::fiber_tree::with_current_fiber;
use crate::hooks::use_effect;
use crate::scheduler::batch::{StateUpdate, StateUpdateKind, queue_update};
#[derive(Debug, Clone, PartialEq, Default)]
pub enum FutureState<T, E = String> {
#[default]
Idle,
Pending,
Resolved(T),
Error(E),
}
impl<T, E> FutureState<T, E> {
pub fn is_idle(&self) -> bool {
matches!(self, FutureState::Idle)
}
pub fn is_pending(&self) -> bool {
matches!(self, FutureState::Pending)
}
pub fn is_resolved(&self) -> bool {
matches!(self, FutureState::Resolved(_))
}
pub fn is_error(&self) -> bool {
matches!(self, FutureState::Error(_))
}
pub fn value(&self) -> Option<&T> {
match self {
FutureState::Resolved(v) => Some(v),
_ => None,
}
}
pub fn error(&self) -> Option<&E> {
match self {
FutureState::Error(e) => Some(e),
_ => None,
}
}
pub fn map<U, F>(self, f: F) -> FutureState<U, E>
where
F: FnOnce(T) -> U,
{
match self {
FutureState::Idle => FutureState::Idle,
FutureState::Pending => FutureState::Pending,
FutureState::Resolved(v) => FutureState::Resolved(f(v)),
FutureState::Error(e) => FutureState::Error(e),
}
}
pub fn map_err<F, G>(self, f: F) -> FutureState<T, G>
where
F: FnOnce(E) -> G,
{
match self {
FutureState::Idle => FutureState::Idle,
FutureState::Pending => FutureState::Pending,
FutureState::Resolved(v) => FutureState::Resolved(v),
FutureState::Error(e) => FutureState::Error(f(e)),
}
}
}
impl<T, E> From<Result<T, E>> for FutureState<T, E> {
fn from(result: Result<T, E>) -> Self {
match result {
Ok(v) => FutureState::Resolved(v),
Err(e) => FutureState::Error(e),
}
}
}
#[derive(Clone)]
pub struct FutureHandle<T, E = String>
where
T: Clone + Send + Sync + 'static,
E: Clone + Send + Sync + 'static,
{
state: Arc<RwLock<FutureState<T, E>>>,
task_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
cancelled: Arc<AtomicBool>,
}
impl<T, E> FutureHandle<T, E>
where
T: Clone + Send + Sync + 'static,
E: Clone + Send + Sync + 'static,
{
fn new() -> Self {
Self {
state: Arc::new(RwLock::new(FutureState::Idle)),
task_handle: Arc::new(Mutex::new(None)),
cancelled: Arc::new(AtomicBool::new(false)),
}
}
pub fn state(&self) -> FutureState<T, E> {
self.state.read().clone()
}
pub fn is_idle(&self) -> bool {
matches!(&*self.state.read(), FutureState::Idle)
}
pub fn is_pending(&self) -> bool {
matches!(&*self.state.read(), FutureState::Pending)
}
pub fn is_resolved(&self) -> bool {
matches!(&*self.state.read(), FutureState::Resolved(_))
}
pub fn is_error(&self) -> bool {
matches!(&*self.state.read(), FutureState::Error(_))
}
pub fn value(&self) -> Option<T> {
match &*self.state.read() {
FutureState::Resolved(v) => Some(v.clone()),
_ => None,
}
}
pub fn error(&self) -> Option<E> {
match &*self.state.read() {
FutureState::Error(e) => Some(e.clone()),
_ => None,
}
}
pub fn cancel(&self) {
self.cancelled.store(true, Ordering::SeqCst);
if let Some(handle) = self.task_handle.lock().take() {
handle.abort();
}
}
fn set_state(&self, new_state: FutureState<T, E>) {
*self.state.write() = new_state;
}
fn set_task_handle(&self, handle: JoinHandle<()>) {
*self.task_handle.lock() = Some(handle);
}
fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::SeqCst)
}
}
impl<T, E> fmt::Debug for FutureHandle<T, E>
where
T: Clone + Send + Sync + fmt::Debug + 'static,
E: Clone + Send + Sync + fmt::Debug + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FutureHandle")
.field("state", &*self.state.read())
.field("cancelled", &self.cancelled.load(Ordering::SeqCst))
.finish()
}
}
struct FutureHookStorage<T, E>
where
T: Clone + Send + Sync + 'static,
E: Clone + Send + Sync + 'static,
{
handle: FutureHandle<T, E>,
generation: u64,
}
impl<T, E> Clone for FutureHookStorage<T, E>
where
T: Clone + Send + Sync + 'static,
E: Clone + Send + Sync + 'static,
{
fn clone(&self) -> Self {
Self {
handle: self.handle.clone(),
generation: self.generation,
}
}
}
impl<T, E> FutureHookStorage<T, E>
where
T: Clone + Send + Sync + 'static,
E: Clone + Send + Sync + 'static,
{
fn new() -> Self {
Self {
handle: FutureHandle::new(),
generation: 0,
}
}
}
pub fn use_future<Deps, F, Fut, T, E>(future_factory: F, deps: Option<Deps>) -> FutureHandle<T, E>
where
Deps: PartialEq + Clone + Send + 'static,
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
T: Clone + Send + Sync + 'static,
E: Clone + Send + Sync + ToString + 'static,
{
let (_fiber_id, _hook_index, handle, _generation) = with_current_fiber(|fiber| {
let hook_index = fiber.next_hook_index();
let storage: FutureHookStorage<T, E> =
fiber.get_or_init_hook(hook_index, FutureHookStorage::new);
(
fiber.id,
hook_index,
storage.handle.clone(),
storage.generation,
)
})
.expect("use_future must be called within a component render context");
let handle_for_effect = handle.clone();
use_effect(
move || {
handle_for_effect.cancel();
handle_for_effect.cancelled.store(false, Ordering::SeqCst);
handle_for_effect.set_state(FutureState::Pending);
let handle_for_task = handle_for_effect.clone();
let handle_for_cleanup = handle_for_effect.clone();
let task_handle = tokio::spawn(async move {
if handle_for_task.is_cancelled() {
return;
}
let result = future_factory().await;
if handle_for_task.is_cancelled() {
return;
}
match result {
Ok(value) => {
handle_for_task.set_state(FutureState::Resolved(value));
}
Err(error) => {
handle_for_task.set_state(FutureState::Error(error));
}
}
});
handle_for_effect.set_task_handle(task_handle);
Some(move || {
handle_for_cleanup.cancel();
})
},
deps,
);
handle
}
pub fn use_future_once<F, Fut, T, E>(future_factory: F) -> FutureHandle<T, E>
where
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
T: Clone + Send + Sync + 'static,
E: Clone + Send + Sync + ToString + 'static,
{
use_future(future_factory, Some(()))
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum QueryStatus {
#[default]
Idle,
Loading,
Refreshing,
Success,
Error,
}
#[derive(Clone)]
pub struct QueryOptions {
pub enabled: bool,
pub stale_time: Duration,
pub cache_time: Duration,
pub retry: bool,
pub retry_attempts: u32,
pub retry_delay: Duration,
}
impl Default for QueryOptions {
fn default() -> Self {
Self {
enabled: true,
stale_time: Duration::from_secs(0),
cache_time: Duration::from_secs(300), retry: true,
retry_attempts: 3,
retry_delay: Duration::from_secs(1),
}
}
}
#[derive(Clone)]
pub struct QueryResult<T, E>
where
T: Clone + Send + Sync + 'static,
E: Clone + Send + Sync + 'static,
{
pub status: QueryStatus,
pub data: Option<T>,
pub error: Option<E>,
pub is_stale: bool,
pub is_fetching: bool,
refetch_fn: Arc<dyn Fn() + Send + Sync>,
invalidate_fn: Arc<dyn Fn() + Send + Sync>,
}
impl<T, E> QueryResult<T, E>
where
T: Clone + Send + Sync + 'static,
E: Clone + Send + Sync + 'static,
{
pub fn refetch(&self) {
(self.refetch_fn)();
}
pub fn invalidate(&self) {
(self.invalidate_fn)();
}
pub fn is_loading(&self) -> bool {
self.status == QueryStatus::Loading
}
pub fn is_success(&self) -> bool {
self.status == QueryStatus::Success
}
pub fn is_error(&self) -> bool {
self.status == QueryStatus::Error
}
}
impl<T, E> fmt::Debug for QueryResult<T, E>
where
T: Clone + Send + Sync + fmt::Debug + 'static,
E: Clone + Send + Sync + fmt::Debug + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("QueryResult")
.field("status", &self.status)
.field("data", &self.data)
.field("error", &self.error)
.field("is_stale", &self.is_stale)
.field("is_fetching", &self.is_fetching)
.finish()
}
}
struct QueryCacheEntry<T> {
data: T,
fetched_at: Instant,
}
type QueryCacheMap =
Arc<Mutex<std::collections::HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>;
static QUERY_CACHE: once_cell::sync::Lazy<QueryCacheMap> =
once_cell::sync::Lazy::new(|| Arc::new(Mutex::new(std::collections::HashMap::new())));
struct QueryHookState<T, E>
where
T: Clone + Send + Sync + 'static,
E: Clone + Send + Sync + 'static,
{
status: QueryStatus,
data: Option<T>,
error: Option<E>,
is_stale: bool,
is_fetching: bool,
refetch_trigger: Arc<AtomicU64>,
}
impl<T, E> Clone for QueryHookState<T, E>
where
T: Clone + Send + Sync + 'static,
E: Clone + Send + Sync + 'static,
{
fn clone(&self) -> Self {
Self {
status: self.status,
data: self.data.clone(),
error: self.error.clone(),
is_stale: self.is_stale,
is_fetching: self.is_fetching,
refetch_trigger: self.refetch_trigger.clone(),
}
}
}
impl<T, E> Default for QueryHookState<T, E>
where
T: Clone + Send + Sync + 'static,
E: Clone + Send + Sync + 'static,
{
fn default() -> Self {
Self {
status: QueryStatus::Idle,
data: None,
error: None,
is_stale: false,
is_fetching: false,
refetch_trigger: Arc::new(AtomicU64::new(0)),
}
}
}
pub fn use_query<K, F, Fut, T, E>(
key: K,
query_fn: F,
options: Option<QueryOptions>,
) -> QueryResult<T, E>
where
K: std::hash::Hash + Eq + Clone + Send + Sync + std::fmt::Debug + 'static,
F: Fn() -> Fut + Clone + Send + Sync + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
T: Clone + Send + Sync + 'static,
E: Clone + Send + Sync + ToString + 'static,
{
let options = options.unwrap_or_default();
let cache_key = format!("{:?}", key);
let (fiber_id, hook_index) = with_current_fiber(|fiber| {
let hook_index = fiber.next_hook_index();
(fiber.id, hook_index)
})
.expect("use_query must be called within a component render context");
let state: QueryHookState<T, E> =
with_current_fiber(|fiber| fiber.get_or_init_hook(hook_index, QueryHookState::default))
.expect("use_query must be called within a component render context");
let refetch_trigger = state.refetch_trigger.clone();
let current_trigger = refetch_trigger.load(Ordering::SeqCst);
let refetch_fn = {
let trigger = refetch_trigger.clone();
Arc::new(move || {
trigger.fetch_add(1, Ordering::SeqCst);
}) as Arc<dyn Fn() + Send + Sync>
};
let invalidate_fn = {
let cache_key = cache_key.clone();
let trigger = refetch_trigger.clone();
Arc::new(move || {
QUERY_CACHE.lock().remove(&cache_key);
trigger.fetch_add(1, Ordering::SeqCst);
}) as Arc<dyn Fn() + Send + Sync>
};
let cached_data: Option<(T, bool)> = {
let cache = QUERY_CACHE.lock();
if let Some(entry) = cache.get(&cache_key) {
if let Some(cache_entry) = entry.downcast_ref::<QueryCacheEntry<T>>() {
let is_stale = cache_entry.fetched_at.elapsed() > options.stale_time;
let is_expired = cache_entry.fetched_at.elapsed() > options.cache_time;
if !is_expired {
Some((cache_entry.data.clone(), is_stale))
} else {
None
}
} else {
None
}
} else {
None
}
};
let (_initial_status, initial_data, initial_is_stale) = match &cached_data {
Some((data, is_stale)) => (QueryStatus::Success, Some(data.clone()), *is_stale),
None => (QueryStatus::Idle, None, false),
};
let cache_key_for_effect = cache_key.clone();
let options_for_effect = options.clone();
use_effect(
move || {
if !options_for_effect.enabled {
return None;
}
let should_fetch = match &cached_data {
None => true,
Some((_, is_stale)) => *is_stale,
};
if !should_fetch && current_trigger == 0 {
return None;
}
let new_status = if cached_data.is_some() {
QueryStatus::Refreshing
} else {
QueryStatus::Loading
};
queue_update(
fiber_id,
StateUpdate {
hook_index,
update: StateUpdateKind::Value(Box::new(QueryHookState::<T, E> {
status: new_status,
data: cached_data.as_ref().map(|(d, _)| d.clone()),
error: None,
is_stale: false,
is_fetching: true,
refetch_trigger: refetch_trigger.clone(),
})),
},
);
let query_fn = query_fn.clone();
let cache_key = cache_key_for_effect.clone();
let options = options_for_effect.clone();
let refetch_trigger = refetch_trigger.clone();
let cancelled = Arc::new(AtomicBool::new(false));
let cancelled_for_cleanup = cancelled.clone();
let task_handle = tokio::spawn(async move {
let mut attempts = 0;
let max_attempts = if options.retry {
options.retry_attempts
} else {
1
};
loop {
if cancelled.load(Ordering::SeqCst) {
return;
}
attempts += 1;
let result = query_fn().await;
if cancelled.load(Ordering::SeqCst) {
return;
}
match result {
Ok(data) => {
{
let mut cache = QUERY_CACHE.lock();
cache.insert(
cache_key.clone(),
Box::new(QueryCacheEntry {
data: data.clone(),
fetched_at: Instant::now(),
}),
);
}
queue_update(
fiber_id,
StateUpdate {
hook_index,
update: StateUpdateKind::Value(Box::new(QueryHookState::<
T,
E,
> {
status: QueryStatus::Success,
data: Some(data),
error: None,
is_stale: false,
is_fetching: false,
refetch_trigger: refetch_trigger.clone(),
})),
},
);
break;
}
Err(error) => {
if attempts >= max_attempts {
queue_update(
fiber_id,
StateUpdate {
hook_index,
update: StateUpdateKind::Value(Box::new(QueryHookState::<
T,
E,
> {
status: QueryStatus::Error,
data: cached_data.as_ref().map(|(d, _)| d.clone()),
error: Some(error),
is_stale: false,
is_fetching: false,
refetch_trigger: refetch_trigger.clone(),
})),
},
);
break;
}
tokio::time::sleep(options.retry_delay).await;
}
}
}
});
Some(move || {
cancelled_for_cleanup.store(true, Ordering::SeqCst);
task_handle.abort();
})
},
Some((cache_key.clone(), current_trigger, options.enabled)),
);
QueryResult {
status: state.status,
data: state.data.or(initial_data),
error: state.error,
is_stale: state.is_stale || initial_is_stale,
is_fetching: state.is_fetching,
refetch_fn,
invalidate_fn,
}
}
pub fn clear_query_cache() {
QUERY_CACHE.lock().clear();
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum MutationStatus {
#[default]
Idle,
Pending,
Success,
Error,
}
#[derive(Clone)]
pub struct MutationOptions {
pub retry: bool,
pub retry_attempts: u32,
pub retry_delay: Duration,
pub retry_exponential_backoff: bool,
pub retry_max_delay: Duration,
}
impl Default for MutationOptions {
fn default() -> Self {
Self {
retry: false,
retry_attempts: 0,
retry_delay: Duration::from_secs(1),
retry_exponential_backoff: false,
retry_max_delay: Duration::from_secs(30),
}
}
}
#[derive(Clone)]
pub struct MutationState<T, E>
where
T: Clone + Send + Sync + 'static,
E: Clone + Send + Sync + 'static,
{
pub status: MutationStatus,
pub data: Option<T>,
pub error: Option<E>,
pub is_pending: bool,
pub is_success: bool,
pub is_error: bool,
pub is_idle: bool,
}
impl<T, E> Default for MutationState<T, E>
where
T: Clone + Send + Sync + 'static,
E: Clone + Send + Sync + 'static,
{
fn default() -> Self {
Self {
status: MutationStatus::Idle,
data: None,
error: None,
is_pending: false,
is_success: false,
is_error: false,
is_idle: true,
}
}
}
impl<T, E> fmt::Debug for MutationState<T, E>
where
T: Clone + Send + Sync + fmt::Debug + 'static,
E: Clone + Send + Sync + fmt::Debug + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MutationState")
.field("status", &self.status)
.field("data", &self.data)
.field("error", &self.error)
.field("is_pending", &self.is_pending)
.field("is_success", &self.is_success)
.field("is_error", &self.is_error)
.field("is_idle", &self.is_idle)
.finish()
}
}
type MutationFn<TData, TError, TVariables> = Arc<
dyn Fn(TVariables) -> Pin<Box<dyn Future<Output = Result<TData, TError>> + Send>> + Send + Sync,
>;
pub struct MutationHandle<TData, TError, TVariables>
where
TData: Clone + Send + Sync + 'static,
TError: Clone + Send + Sync + 'static,
TVariables: Clone + Send + Sync + 'static,
{
state: Arc<RwLock<MutationState<TData, TError>>>,
mutation_fn: MutationFn<TData, TError, TVariables>,
options: Arc<MutationOptions>,
task_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
fiber_id: FiberId,
hook_index: usize,
}
impl<TData, TError, TVariables> Clone for MutationHandle<TData, TError, TVariables>
where
TData: Clone + Send + Sync + 'static,
TError: Clone + Send + Sync + 'static,
TVariables: Clone + Send + Sync + 'static,
{
fn clone(&self) -> Self {
Self {
state: self.state.clone(),
mutation_fn: self.mutation_fn.clone(),
options: self.options.clone(),
task_handle: self.task_handle.clone(),
fiber_id: self.fiber_id,
hook_index: self.hook_index,
}
}
}
impl<TData, TError, TVariables> MutationHandle<TData, TError, TVariables>
where
TData: Clone + Send + Sync + 'static,
TError: Clone + Send + Sync + 'static,
TVariables: Clone + Send + Sync + 'static,
{
pub fn state(&self) -> MutationState<TData, TError> {
self.state.read().clone()
}
pub fn is_idle(&self) -> bool {
self.state.read().is_idle
}
pub fn is_pending(&self) -> bool {
self.state.read().is_pending
}
pub fn is_success(&self) -> bool {
self.state.read().is_success
}
pub fn is_error(&self) -> bool {
self.state.read().is_error
}
pub fn data(&self) -> Option<TData> {
self.state.read().data.clone()
}
pub fn error(&self) -> Option<TError> {
self.state.read().error.clone()
}
pub fn reset(&self) {
if let Some(handle) = self.task_handle.lock().take() {
handle.abort();
}
*self.state.write() = MutationState::default();
}
pub fn mutate(&self, variables: TVariables) {
let state = self.state.clone();
let mutation_fn = self.mutation_fn.clone();
let options = self.options.clone();
let task_handle = self.task_handle.clone();
let fiber_id = self.fiber_id;
let hook_index = self.hook_index;
if let Some(handle) = task_handle.lock().take() {
handle.abort();
}
{
let mut s = state.write();
s.status = MutationStatus::Pending;
s.is_pending = true;
s.is_idle = false;
s.is_success = false;
s.is_error = false;
s.error = None;
}
queue_update(
fiber_id,
StateUpdate {
hook_index,
update: StateUpdateKind::Value(Box::new(state.read().clone())),
},
);
let handle = tokio::spawn(async move {
let mut attempts = 0;
let max_attempts = if options.retry {
options.retry_attempts + 1
} else {
1
};
loop {
attempts += 1;
let result = mutation_fn(variables.clone()).await;
match result {
Ok(data) => {
{
let mut s = state.write();
s.status = MutationStatus::Success;
s.data = Some(data);
s.error = None;
s.is_pending = false;
s.is_success = true;
s.is_error = false;
s.is_idle = false;
}
queue_update(
fiber_id,
StateUpdate {
hook_index,
update: StateUpdateKind::Value(Box::new(state.read().clone())),
},
);
break;
}
Err(error) => {
if attempts >= max_attempts {
{
let mut s = state.write();
s.status = MutationStatus::Error;
s.error = Some(error);
s.is_pending = false;
s.is_success = false;
s.is_error = true;
s.is_idle = false;
}
queue_update(
fiber_id,
StateUpdate {
hook_index,
update: StateUpdateKind::Value(Box::new(state.read().clone())),
},
);
break;
}
let delay = if options.retry_exponential_backoff {
let exp_delay = options
.retry_delay
.checked_mul(2_u32.pow(attempts - 1))
.unwrap_or(options.retry_max_delay);
exp_delay.min(options.retry_max_delay)
} else {
options.retry_delay
};
tokio::time::sleep(delay).await;
}
}
}
});
*task_handle.lock() = Some(handle);
}
pub async fn mutate_async(&self, variables: TVariables) -> Result<TData, TError> {
let state = self.state.clone();
let mutation_fn = self.mutation_fn.clone();
let options = self.options.clone();
let fiber_id = self.fiber_id;
let hook_index = self.hook_index;
{
let mut s = state.write();
s.status = MutationStatus::Pending;
s.is_pending = true;
s.is_idle = false;
s.is_success = false;
s.is_error = false;
s.error = None;
}
queue_update(
fiber_id,
StateUpdate {
hook_index,
update: StateUpdateKind::Value(Box::new(state.read().clone())),
},
);
let mut attempts = 0;
let max_attempts = if options.retry {
options.retry_attempts + 1
} else {
1
};
loop {
attempts += 1;
let result = mutation_fn(variables.clone()).await;
match result {
Ok(data) => {
{
let mut s = state.write();
s.status = MutationStatus::Success;
s.data = Some(data.clone());
s.error = None;
s.is_pending = false;
s.is_success = true;
s.is_error = false;
s.is_idle = false;
}
queue_update(
fiber_id,
StateUpdate {
hook_index,
update: StateUpdateKind::Value(Box::new(state.read().clone())),
},
);
return Ok(data);
}
Err(error) => {
if attempts >= max_attempts {
{
let mut s = state.write();
s.status = MutationStatus::Error;
s.error = Some(error.clone());
s.is_pending = false;
s.is_success = false;
s.is_error = true;
s.is_idle = false;
}
queue_update(
fiber_id,
StateUpdate {
hook_index,
update: StateUpdateKind::Value(Box::new(state.read().clone())),
},
);
return Err(error);
}
let delay = if options.retry_exponential_backoff {
let exp_delay = options
.retry_delay
.checked_mul(2_u32.pow(attempts - 1))
.unwrap_or(options.retry_max_delay);
exp_delay.min(options.retry_max_delay)
} else {
options.retry_delay
};
tokio::time::sleep(delay).await;
}
}
}
}
}
impl<TData, TError, TVariables> fmt::Debug for MutationHandle<TData, TError, TVariables>
where
TData: Clone + Send + Sync + fmt::Debug + 'static,
TError: Clone + Send + Sync + fmt::Debug + 'static,
TVariables: Clone + Send + Sync + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MutationHandle")
.field("state", &*self.state.read())
.finish()
}
}
pub fn use_mutation<TData, TError, TVariables, F, Fut>(
mutation_fn: F,
options: Option<MutationOptions>,
) -> MutationHandle<TData, TError, TVariables>
where
TData: Clone + Send + Sync + 'static,
TError: Clone + Send + Sync + 'static,
TVariables: Clone + Send + Sync + 'static,
F: Fn(TVariables) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<TData, TError>> + Send + 'static,
{
let options = options.unwrap_or_default();
let (fiber_id, hook_index) = with_current_fiber(|fiber| {
let hook_index = fiber.next_hook_index();
(fiber.id, hook_index)
})
.expect("use_mutation must be called within a component render context");
let state: Arc<RwLock<MutationState<TData, TError>>> = with_current_fiber(|fiber| {
fiber.get_or_init_hook(hook_index, || {
Arc::new(RwLock::new(MutationState::default()))
})
})
.expect("use_mutation must be called within a component render context");
let boxed_fn: MutationFn<TData, TError, TVariables> = Arc::new(move |variables: TVariables| {
Box::pin(mutation_fn(variables))
as Pin<Box<dyn Future<Output = Result<TData, TError>> + Send>>
});
MutationHandle {
state,
mutation_fn: boxed_fn,
options: Arc::new(options),
task_handle: Arc::new(Mutex::new(None)),
fiber_id,
hook_index,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::fiber_tree::{FiberTree, clear_fiber_tree, set_fiber_tree};
use crate::scheduler::batch::clear_state_batch;
use crate::scheduler::effect_queue::clear_effect_queue;
fn setup_test_fiber() -> FiberId {
clear_effect_queue();
clear_state_batch();
let mut tree = FiberTree::new();
let fiber_id = tree.mount(None, None);
tree.begin_render(fiber_id);
set_fiber_tree(tree);
fiber_id
}
fn cleanup_test() {
clear_fiber_tree();
clear_effect_queue();
clear_state_batch();
clear_query_cache();
}
#[test]
fn test_future_state_default() {
let state: FutureState<i32, String> = FutureState::default();
assert!(state.is_idle());
assert!(!state.is_pending());
assert!(!state.is_resolved());
assert!(!state.is_error());
}
#[test]
fn test_future_state_methods() {
let idle: FutureState<i32, String> = FutureState::Idle;
assert!(idle.is_idle());
assert!(idle.value().is_none());
assert!(idle.error().is_none());
let pending: FutureState<i32, String> = FutureState::Pending;
assert!(pending.is_pending());
let resolved: FutureState<i32, String> = FutureState::Resolved(42);
assert!(resolved.is_resolved());
assert_eq!(resolved.value(), Some(&42));
let error: FutureState<i32, String> = FutureState::Error("error".to_string());
assert!(error.is_error());
assert_eq!(error.error(), Some(&"error".to_string()));
}
#[test]
fn test_future_state_map() {
let resolved: FutureState<i32, String> = FutureState::Resolved(42);
let mapped = resolved.map(|x| x * 2);
assert_eq!(mapped, FutureState::Resolved(84));
let error: FutureState<i32, String> = FutureState::Error("error".to_string());
let mapped_err = error.map_err(|e| format!("wrapped: {}", e));
assert_eq!(mapped_err, FutureState::Error("wrapped: error".to_string()));
}
#[test]
fn test_future_state_from_result() {
let ok: FutureState<i32, String> = Ok(42).into();
assert_eq!(ok, FutureState::Resolved(42));
let err: FutureState<i32, String> = Err("error".to_string()).into();
assert_eq!(err, FutureState::Error("error".to_string()));
}
#[test]
fn test_future_handle_new() {
let handle: FutureHandle<i32, String> = FutureHandle::new();
assert!(handle.is_idle());
assert!(!handle.is_pending());
assert!(!handle.is_resolved());
assert!(!handle.is_error());
assert!(handle.value().is_none());
assert!(handle.error().is_none());
}
#[test]
fn test_future_handle_set_state() {
let handle: FutureHandle<i32, String> = FutureHandle::new();
handle.set_state(FutureState::Pending);
assert!(handle.is_pending());
handle.set_state(FutureState::Resolved(42));
assert!(handle.is_resolved());
assert_eq!(handle.value(), Some(42));
handle.set_state(FutureState::Error("error".to_string()));
assert!(handle.is_error());
assert_eq!(handle.error(), Some("error".to_string()));
}
#[test]
fn test_future_handle_cancel() {
let handle: FutureHandle<i32, String> = FutureHandle::new();
assert!(!handle.is_cancelled());
handle.cancel();
assert!(handle.is_cancelled());
}
#[test]
fn test_use_future_returns_handle() {
let _fiber_id = setup_test_fiber();
let handle = use_future(|| async { Ok::<i32, String>(42) }, Some(()));
assert!(!handle.is_cancelled());
cleanup_test();
}
#[test]
fn test_query_status_default() {
let status = QueryStatus::default();
assert_eq!(status, QueryStatus::Idle);
}
#[test]
fn test_query_options_default() {
let options = QueryOptions::default();
assert!(options.enabled);
assert_eq!(options.stale_time, Duration::from_secs(0));
assert_eq!(options.cache_time, Duration::from_secs(300));
assert!(options.retry);
assert_eq!(options.retry_attempts, 3);
}
#[test]
fn test_mutation_status_default() {
let status = MutationStatus::default();
assert_eq!(status, MutationStatus::Idle);
}
#[test]
fn test_mutation_options_default() {
let options = MutationOptions::default();
assert!(!options.retry);
assert_eq!(options.retry_attempts, 0);
assert_eq!(options.retry_delay, Duration::from_secs(1));
assert!(!options.retry_exponential_backoff);
}
#[test]
fn test_mutation_state_default() {
let state: MutationState<i32, String> = MutationState::default();
assert_eq!(state.status, MutationStatus::Idle);
assert!(state.data.is_none());
assert!(state.error.is_none());
assert!(!state.is_pending);
assert!(!state.is_success);
assert!(!state.is_error);
assert!(state.is_idle);
}
#[test]
fn test_use_mutation_returns_handle() {
let _fiber_id = setup_test_fiber();
let mutation = use_mutation(|x: i32| async move { Ok::<i32, String>(x * 2) }, None);
assert!(mutation.is_idle());
assert!(!mutation.is_pending());
assert!(!mutation.is_success());
assert!(!mutation.is_error());
cleanup_test();
}
#[test]
fn test_use_mutation_reset() {
let _fiber_id = setup_test_fiber();
let mutation = use_mutation(|x: i32| async move { Ok::<i32, String>(x * 2) }, None);
{
let mut state = mutation.state.write();
state.status = MutationStatus::Success;
state.data = Some(42);
state.is_success = true;
state.is_idle = false;
}
assert!(mutation.is_success());
mutation.reset();
assert!(mutation.is_idle());
assert!(mutation.data().is_none());
cleanup_test();
}
#[test]
fn test_use_query_returns_result() {
let _fiber_id = setup_test_fiber();
let result = use_query("test-key", || async { Ok::<i32, String>(42) }, None);
assert!(result.status == QueryStatus::Idle || result.status == QueryStatus::Loading);
cleanup_test();
}
#[test]
fn test_use_query_disabled() {
let _fiber_id = setup_test_fiber();
let result = use_query(
"disabled-key",
|| async { Ok::<i32, String>(42) },
Some(QueryOptions {
enabled: false,
..Default::default()
}),
);
assert_eq!(result.status, QueryStatus::Idle);
cleanup_test();
}
#[test]
fn test_clear_query_cache() {
{
let mut cache = QUERY_CACHE.lock();
cache.insert(
"test".to_string(),
Box::new(QueryCacheEntry {
data: 42i32,
fetched_at: Instant::now(),
}),
);
}
assert!(!QUERY_CACHE.lock().is_empty());
clear_query_cache();
assert!(QUERY_CACHE.lock().is_empty());
}
}