use core::fmt;
use std::{
cell::{Ref, RefCell},
collections::{HashMap, HashSet},
future::Future,
hash::Hash,
mem,
rc::Rc,
sync::{Arc, Mutex},
time::Duration,
};
use dioxus::prelude::*;
use dioxus::signals::CopyValue;
use dioxus_core::{
provide_root_context, spawn_forever, use_drop, ReactiveContext, SuspendedFuture, Task,
};
use futures_util::stream::{FuturesUnordered, StreamExt};
use tokio::sync::Notify;
#[cfg(not(target_family = "wasm"))]
use tokio::time;
#[cfg(not(target_family = "wasm"))]
use tokio::time::Instant;
#[cfg(target_family = "wasm")]
use wasmtimer::tokio as time;
#[cfg(target_family = "wasm")]
use web_time::Instant;
pub trait QueryCapability
where
Self: 'static + Clone + PartialEq + Hash + Eq,
{
type Ok;
type Err;
type Keys: Hash + PartialEq + Clone;
fn run(&self, keys: &Self::Keys) -> impl Future<Output = Result<Self::Ok, Self::Err>>;
fn matches(&self, _keys: &Self::Keys) -> bool {
true
}
}
pub enum QueryStateData<Q: QueryCapability> {
Pending,
Loading { res: Option<Result<Q::Ok, Q::Err>> },
Settled {
res: Result<Q::Ok, Q::Err>,
settlement_instant: Instant,
},
}
impl<Q: QueryCapability> TryFrom<QueryStateData<Q>> for Result<Q::Ok, Q::Err> {
type Error = ();
fn try_from(value: QueryStateData<Q>) -> Result<Self, Self::Error> {
match value {
QueryStateData::Loading { res: Some(res) } => Ok(res),
QueryStateData::Settled { res, .. } => Ok(res),
_ => Err(()),
}
}
}
impl<Q> fmt::Debug for QueryStateData<Q>
where
Q: QueryCapability,
Q::Ok: fmt::Debug,
Q::Err: fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Pending => f.write_str("Pending"),
Self::Loading { res } => write!(f, "Loading {{ {res:?} }}"),
Self::Settled { res, .. } => write!(f, "Settled {{ {res:?} }}"),
}
}
}
impl<Q: QueryCapability> QueryStateData<Q> {
pub fn is_ok(&self) -> bool {
matches!(self, QueryStateData::Settled { res: Ok(_), .. })
}
pub fn is_err(&self) -> bool {
matches!(self, QueryStateData::Settled { res: Err(_), .. })
}
pub fn is_loading(&self) -> bool {
matches!(self, QueryStateData::Loading { .. })
}
pub fn is_pending(&self) -> bool {
matches!(self, QueryStateData::Pending)
}
pub fn is_stale(&self, query: &Query<Q>) -> bool {
match self {
QueryStateData::Pending => true,
QueryStateData::Loading { .. } => true,
QueryStateData::Settled {
settlement_instant, ..
} => Instant::now().duration_since(*settlement_instant) >= query.stale_time,
}
}
pub fn ok(&self) -> Option<&Q::Ok> {
match self {
Self::Settled { res: Ok(res), .. } => Some(res),
Self::Loading { res: Some(Ok(res)) } => Some(res),
_ => None,
}
}
pub fn unwrap(&self) -> &Result<Q::Ok, Q::Err> {
match self {
Self::Loading { res: Some(v) } => v,
Self::Settled { res, .. } => res,
_ => unreachable!(),
}
}
fn into_loading(self) -> QueryStateData<Q> {
match self {
QueryStateData::Pending => QueryStateData::Loading { res: None },
QueryStateData::Loading { res } => QueryStateData::Loading { res },
QueryStateData::Settled { res, .. } => QueryStateData::Loading { res: Some(res) },
}
}
}
pub struct QueriesStorage<Q: QueryCapability> {
storage: CopyValue<HashMap<Query<Q>, QueryData<Q>>>,
}
impl<Q: QueryCapability> Copy for QueriesStorage<Q> {}
impl<Q: QueryCapability> Clone for QueriesStorage<Q> {
fn clone(&self) -> Self {
*self
}
}
struct QuerySuspenseData {
notifier: Arc<Notify>,
task: Task,
}
pub struct QueryData<Q: QueryCapability> {
state: Rc<RefCell<QueryStateData<Q>>>,
reactive_contexts: Arc<Mutex<HashSet<ReactiveContext>>>,
suspense_task: Rc<RefCell<Option<QuerySuspenseData>>>,
interval_task: Rc<RefCell<Option<(Duration, Task)>>>,
clean_task: Rc<RefCell<Option<Task>>>,
}
impl<Q: QueryCapability> Clone for QueryData<Q> {
fn clone(&self) -> Self {
Self {
state: self.state.clone(),
reactive_contexts: self.reactive_contexts.clone(),
suspense_task: self.suspense_task.clone(),
interval_task: self.interval_task.clone(),
clean_task: self.clean_task.clone(),
}
}
}
impl<Q: QueryCapability> QueriesStorage<Q> {
fn new_in_root() -> Self {
Self {
storage: CopyValue::new_in_scope(HashMap::default(), ScopeId::ROOT),
}
}
fn insert_or_get_query(&mut self, query: Query<Q>) -> QueryData<Q> {
let query_clone = query.clone();
let mut storage = self.storage.write();
let query_data = storage.entry(query).or_insert_with(|| QueryData {
state: Rc::new(RefCell::new(QueryStateData::Pending)),
reactive_contexts: Arc::default(),
suspense_task: Rc::default(),
interval_task: Rc::default(),
clean_task: Rc::default(),
});
let query_data_clone = query_data.clone();
if let Some(clean_task) = query_data.clean_task.take() {
clean_task.cancel();
}
let interval = query_clone.interval_time;
let interval_enabled = query_clone.interval_time != Duration::MAX;
let interval_task = &mut *query_data.interval_task.borrow_mut();
let create_interval_task = match interval_task {
None if interval_enabled => true,
Some((current_interval, current_interval_task)) if interval_enabled => {
let new_interval_is_shorter = *current_interval > interval;
if new_interval_is_shorter {
current_interval_task.cancel();
*interval_task = None;
}
new_interval_is_shorter
}
_ => false,
};
if create_interval_task {
let task = spawn_forever(async move {
loop {
time::sleep(interval).await;
QueriesStorage::<Q>::run_queries(&[(&query_clone, &query_data_clone)]).await;
}
});
*interval_task = Some((interval, task));
}
query_data.clone()
}
fn update_tasks(&mut self, query: Query<Q>) {
let mut storage_clone = self.storage;
let mut storage = self.storage.write();
let query_data = storage.get_mut(&query).unwrap();
if let Some((_, interval_task)) = query_data.interval_task.take() {
interval_task.cancel();
}
if query_data.reactive_contexts.lock().unwrap().is_empty() {
*query_data.clean_task.borrow_mut() = Some(spawn_forever(async move {
time::sleep(query.clean_time).await;
let mut storage = storage_clone.write();
storage.remove(&query);
}));
}
}
pub async fn get(get_query: GetQuery<Q>) -> QueryReader<Q> {
let query: Query<Q> = get_query.into();
let mut storage = match try_consume_context::<QueriesStorage<Q>>() {
Some(storage) => storage,
None => provide_root_context(QueriesStorage::<Q>::new_in_root()),
};
let query_data = storage
.storage
.write()
.entry(query.clone())
.or_insert_with(|| QueryData {
state: Rc::new(RefCell::new(QueryStateData::Pending)),
reactive_contexts: Arc::default(),
suspense_task: Rc::default(),
interval_task: Rc::default(),
clean_task: Rc::default(),
})
.clone();
if query_data.state.borrow().is_stale(&query) {
let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending)
.into_loading();
*query_data.state.borrow_mut() = res;
for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
reactive_context.mark_dirty();
}
let res = query.query.run(&query.keys).await;
*query_data.state.borrow_mut() = QueryStateData::Settled {
res,
settlement_instant: Instant::now(),
};
for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
reactive_context.mark_dirty();
}
if let Some(suspense_task) = &*query_data.suspense_task.borrow() {
suspense_task.notifier.notify_waiters();
};
}
if query_data.reactive_contexts.lock().unwrap().is_empty() {
*query_data.clean_task.borrow_mut() = Some(spawn_forever(async move {
time::sleep(query.clean_time).await;
let mut storage = storage.storage.write();
storage.remove(&query);
}));
}
QueryReader {
state: query_data.state,
}
}
pub async fn invalidate_all() {
let storage = consume_context::<QueriesStorage<Q>>();
let matching_queries = storage
.storage
.read()
.clone()
.into_iter()
.collect::<Vec<_>>();
let matching_queries = matching_queries
.iter()
.map(|(q, d)| (q, d))
.collect::<Vec<_>>();
Self::run_queries(&matching_queries).await
}
pub async fn invalidate_matching(matching_keys: Q::Keys) {
let storage = consume_context::<QueriesStorage<Q>>();
let mut matching_queries = Vec::new();
for (query, data) in storage.storage.read().iter() {
if query.query.matches(&matching_keys) {
matching_queries.push((query.clone(), data.clone()));
}
}
let matching_queries = matching_queries
.iter()
.map(|(q, d)| (q, d))
.collect::<Vec<_>>();
Self::run_queries(&matching_queries).await
}
async fn run_queries(queries: &[(&Query<Q>, &QueryData<Q>)]) {
let tasks = FuturesUnordered::new();
for (query, query_data) in queries {
let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending)
.into_loading();
*query_data.state.borrow_mut() = res;
for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
reactive_context.mark_dirty();
}
tasks.push(Box::pin(async move {
let res = query.query.run(&query.keys).await;
*query_data.state.borrow_mut() = QueryStateData::Settled {
res,
settlement_instant: Instant::now(),
};
for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
reactive_context.mark_dirty();
}
if let Some(suspense_task) = &*query_data.suspense_task.borrow() {
suspense_task.notifier.notify_waiters();
};
}));
}
tasks.count().await;
}
}
pub struct GetQuery<Q: QueryCapability> {
query: Q,
keys: Q::Keys,
stale_time: Duration,
clean_time: Duration,
}
impl<Q: QueryCapability> GetQuery<Q> {
pub fn new(keys: Q::Keys, query: Q) -> Self {
Self {
query,
keys,
stale_time: Duration::ZERO,
clean_time: Duration::ZERO,
}
}
pub fn stale_time(self, stale_time: Duration) -> Self {
Self { stale_time, ..self }
}
pub fn clean_time(self, clean_time: Duration) -> Self {
Self { clean_time, ..self }
}
}
impl<Q: QueryCapability> From<GetQuery<Q>> for Query<Q> {
fn from(value: GetQuery<Q>) -> Self {
Query {
query: value.query,
keys: value.keys,
enabled: true,
stale_time: value.stale_time,
clean_time: value.clean_time,
interval_time: Duration::MAX,
}
}
}
#[derive(PartialEq, Clone)]
pub struct Query<Q: QueryCapability> {
query: Q,
keys: Q::Keys,
enabled: bool,
stale_time: Duration,
clean_time: Duration,
interval_time: Duration,
}
impl<Q: QueryCapability> Eq for Query<Q> {}
impl<Q: QueryCapability> Hash for Query<Q> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.query.hash(state);
self.keys.hash(state);
self.enabled.hash(state);
self.stale_time.hash(state);
self.clean_time.hash(state);
}
}
impl<Q: QueryCapability> Query<Q> {
pub fn new(keys: Q::Keys, query: Q) -> Self {
Self {
query,
keys,
enabled: true,
stale_time: Duration::ZERO,
clean_time: Duration::from_secs(5 * 60),
interval_time: Duration::MAX,
}
}
pub fn enable(self, enabled: bool) -> Self {
Self { enabled, ..self }
}
pub fn stale_time(self, stale_time: Duration) -> Self {
Self { stale_time, ..self }
}
pub fn clean_time(self, clean_time: Duration) -> Self {
Self { clean_time, ..self }
}
pub fn interval_time(self, interval_time: Duration) -> Self {
Self {
interval_time,
..self
}
}
}
pub struct QueryReader<Q: QueryCapability> {
state: Rc<RefCell<QueryStateData<Q>>>,
}
impl<Q: QueryCapability> QueryReader<Q> {
pub fn state(&self) -> Ref<QueryStateData<Q>> {
self.state.borrow()
}
pub fn as_settled(&self) -> Ref<Result<Q::Ok, Q::Err>> {
Ref::map(self.state.borrow(), |state| match state {
QueryStateData::Settled { res, .. } => res,
_ => panic!("Query is not settled."),
})
}
}
pub struct UseQuery<Q: QueryCapability> {
query: Signal<Query<Q>>,
}
impl<Q: QueryCapability> Clone for UseQuery<Q> {
fn clone(&self) -> Self {
*self
}
}
impl<Q: QueryCapability> Copy for UseQuery<Q> {}
impl<Q: QueryCapability> UseQuery<Q> {
pub fn read(&self) -> QueryReader<Q> {
let storage = consume_context::<QueriesStorage<Q>>();
let query_data = storage
.storage
.peek_unchecked()
.get(&self.query.peek())
.cloned()
.unwrap();
if let Some(reactive_context) = ReactiveContext::current() {
reactive_context.subscribe(query_data.reactive_contexts);
}
QueryReader {
state: query_data.state,
}
}
pub fn peek(&self) -> QueryReader<Q> {
let storage = consume_context::<QueriesStorage<Q>>();
let query_data = storage
.storage
.peek_unchecked()
.get(&self.query.peek())
.cloned()
.unwrap();
QueryReader {
state: query_data.state,
}
}
pub fn suspend(&self) -> Result<Result<Q::Ok, Q::Err>, RenderError>
where
Q::Ok: Clone,
Q::Err: Clone,
{
let storage = consume_context::<QueriesStorage<Q>>();
let mut storage = storage.storage.write_unchecked();
let query_data = storage.get_mut(&self.query.peek()).unwrap();
if let Some(reactive_context) = ReactiveContext::current() {
reactive_context.subscribe(query_data.reactive_contexts.clone());
}
let state = &*query_data.state.borrow();
match state {
QueryStateData::Pending | QueryStateData::Loading { res: None } => {
let suspense_task_clone = query_data.suspense_task.clone();
let mut suspense_task = query_data.suspense_task.borrow_mut();
let QuerySuspenseData { task, .. } = suspense_task.get_or_insert_with(|| {
let notifier = Arc::new(Notify::new());
let task = spawn({
let notifier = notifier.clone();
async move {
notifier.notified().await;
let _ = suspense_task_clone.borrow_mut().take();
}
});
QuerySuspenseData { notifier, task }
});
Err(RenderError::Suspended(SuspendedFuture::new(*task)))
}
QueryStateData::Settled { res, .. } | QueryStateData::Loading { res: Some(res) } => {
Ok(res.clone())
}
}
}
pub async fn invalidate_async(&self) -> QueryReader<Q> {
let storage = consume_context::<QueriesStorage<Q>>();
let query = self.query.peek().clone();
let query_data = storage
.storage
.peek_unchecked()
.get(&query)
.cloned()
.unwrap();
QueriesStorage::run_queries(&[(&query, &query_data)]).await;
QueryReader {
state: query_data.state.clone(),
}
}
pub fn invalidate(&self) {
let storage = consume_context::<QueriesStorage<Q>>();
let query = self.query.peek().clone();
let query_data = storage
.storage
.peek_unchecked()
.get(&query)
.cloned()
.unwrap();
spawn(async move { QueriesStorage::run_queries(&[(&query, &query_data)]).await });
}
}
pub fn use_query<Q: QueryCapability>(query: Query<Q>) -> UseQuery<Q> {
let mut storage = match try_consume_context::<QueriesStorage<Q>>() {
Some(storage) => storage,
None => provide_root_context(QueriesStorage::<Q>::new_in_root()),
};
let mut make_query = |query: &Query<Q>, mut prev_query: Option<Query<Q>>| {
let query_data = storage.insert_or_get_query(query.clone());
if let Some(prev_query) = prev_query.take() {
storage.update_tasks(prev_query);
}
if query.enabled && query_data.state.borrow().is_stale(query) {
let query = query.clone();
spawn(async move {
QueriesStorage::run_queries(&[(&query, &query_data)]).await;
});
}
};
let mut current_query = use_hook(|| {
make_query(&query, None);
Signal::new(query.clone())
});
if *current_query.read() != query {
let prev = mem::replace(&mut *current_query.write(), query.clone());
make_query(&query, Some(prev));
}
use_drop({
move || {
storage.update_tasks(current_query.peek().clone());
}
});
UseQuery {
query: current_query,
}
}