1use core::fmt;
2use std::{
3 cell::{Ref, RefCell},
4 collections::{HashMap, HashSet},
5 future::Future,
6 hash::Hash,
7 mem,
8 rc::Rc,
9 sync::{Arc, Mutex},
10 time::Duration,
11};
12
13use dioxus::prelude::*;
14use dioxus::{
15 hooks::{use_memo, use_reactive},
16 signals::CopyValue,
17};
18use dioxus_core::{
19 provide_root_context, spawn_forever, use_drop, ReactiveContext, SuspendedFuture, Task,
20};
21use futures_util::stream::{FuturesUnordered, StreamExt};
22use tokio::sync::Notify;
23#[cfg(not(target_family = "wasm"))]
24use tokio::time;
25#[cfg(not(target_family = "wasm"))]
26use tokio::time::Instant;
27#[cfg(target_family = "wasm")]
28use wasmtimer::tokio as time;
29#[cfg(target_family = "wasm")]
30use web_time::Instant;
31
32pub trait QueryCapability
33where
34 Self: 'static + Clone + PartialEq + Hash + Eq,
35{
36 type Ok;
37 type Err;
38 type Keys: Hash + PartialEq + Clone;
39
40 fn run(&self, keys: &Self::Keys) -> impl Future<Output = Result<Self::Ok, Self::Err>>;
42
43 fn matches(&self, _keys: &Self::Keys) -> bool {
45 true
46 }
47}
48
49pub enum QueryStateData<Q: QueryCapability> {
50 Pending,
52 Loading { res: Option<Result<Q::Ok, Q::Err>> },
54 Settled {
56 res: Result<Q::Ok, Q::Err>,
57 settlement_instant: Instant,
58 },
59}
60
61impl<Q: QueryCapability> TryFrom<QueryStateData<Q>> for Result<Q::Ok, Q::Err> {
62 type Error = ();
63
64 fn try_from(value: QueryStateData<Q>) -> Result<Self, Self::Error> {
65 match value {
66 QueryStateData::Loading { res: Some(res) } => Ok(res),
67 QueryStateData::Settled { res, .. } => Ok(res),
68 _ => Err(()),
69 }
70 }
71}
72
73impl<Q> fmt::Debug for QueryStateData<Q>
74where
75 Q: QueryCapability,
76 Q::Ok: fmt::Debug,
77 Q::Err: fmt::Debug,
78{
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 match self {
81 Self::Pending => f.write_str("Pending"),
82 Self::Loading { res } => write!(f, "Loading {{ {res:?} }}"),
83 Self::Settled { res, .. } => write!(f, "Settled {{ {res:?} }}"),
84 }
85 }
86}
87
88impl<Q: QueryCapability> QueryStateData<Q> {
89 pub fn is_ok(&self) -> bool {
91 matches!(self, QueryStateData::Settled { res: Ok(_), .. })
92 }
93
94 pub fn is_err(&self) -> bool {
96 matches!(self, QueryStateData::Settled { res: Err(_), .. })
97 }
98
99 pub fn is_loading(&self) -> bool {
101 matches!(self, QueryStateData::Loading { .. })
102 }
103
104 pub fn is_pending(&self) -> bool {
106 matches!(self, QueryStateData::Pending)
107 }
108
109 pub fn is_stale(&self, query: &Query<Q>) -> bool {
111 match self {
112 QueryStateData::Pending => true,
113 QueryStateData::Loading { .. } => true,
114 QueryStateData::Settled {
115 settlement_instant, ..
116 } => Instant::now().duration_since(*settlement_instant) >= query.stale_time,
117 }
118 }
119
120 pub fn ok(&self) -> Option<&Q::Ok> {
122 match self {
123 Self::Settled { res: Ok(res), .. } => Some(res),
124 Self::Loading { res: Some(Ok(res)) } => Some(res),
125 _ => None,
126 }
127 }
128
129 pub fn unwrap(&self) -> &Result<Q::Ok, Q::Err> {
131 match self {
132 Self::Loading { res: Some(v) } => v,
133 Self::Settled { res, .. } => res,
134 _ => unreachable!(),
135 }
136 }
137
138 fn into_loading(self) -> QueryStateData<Q> {
139 match self {
140 QueryStateData::Pending => QueryStateData::Loading { res: None },
141 QueryStateData::Loading { res } => QueryStateData::Loading { res },
142 QueryStateData::Settled { res, .. } => QueryStateData::Loading { res: Some(res) },
143 }
144 }
145}
146pub struct QueriesStorage<Q: QueryCapability> {
147 storage: CopyValue<HashMap<Query<Q>, QueryData<Q>>>,
148}
149
150impl<Q: QueryCapability> Copy for QueriesStorage<Q> {}
151
152impl<Q: QueryCapability> Clone for QueriesStorage<Q> {
153 fn clone(&self) -> Self {
154 *self
155 }
156}
157
158struct QuerySuspenseData {
159 notifier: Arc<Notify>,
160 task: Task,
161}
162
163pub struct QueryData<Q: QueryCapability> {
164 state: Rc<RefCell<QueryStateData<Q>>>,
165 reactive_contexts: Arc<Mutex<HashSet<ReactiveContext>>>,
166
167 suspense_task: Rc<RefCell<Option<QuerySuspenseData>>>,
168 interval_task: Rc<RefCell<Option<(Duration, Task)>>>,
169 clean_task: Rc<RefCell<Option<Task>>>,
170}
171
172impl<Q: QueryCapability> Clone for QueryData<Q> {
173 fn clone(&self) -> Self {
174 Self {
175 state: self.state.clone(),
176 reactive_contexts: self.reactive_contexts.clone(),
177
178 suspense_task: self.suspense_task.clone(),
179 interval_task: self.interval_task.clone(),
180 clean_task: self.clean_task.clone(),
181 }
182 }
183}
184
185impl<Q: QueryCapability> QueriesStorage<Q> {
186 fn new_in_root() -> Self {
187 Self {
188 storage: CopyValue::new_in_scope(HashMap::default(), ScopeId::ROOT),
189 }
190 }
191
192 fn insert_or_get_query(&mut self, query: Query<Q>) -> QueryData<Q> {
193 let query_clone = query.clone();
194 let mut storage = self.storage.write();
195
196 let query_data = storage.entry(query).or_insert_with(|| QueryData {
197 state: Rc::new(RefCell::new(QueryStateData::Pending)),
198 reactive_contexts: Arc::default(),
199 suspense_task: Rc::default(),
200 interval_task: Rc::default(),
201 clean_task: Rc::default(),
202 });
203 let query_data_clone = query_data.clone();
204
205 if let Some(clean_task) = query_data.clean_task.take() {
207 clean_task.cancel();
208 }
209
210 let interval = query_clone.interval_time;
214 let interval_enabled = query_clone.interval_time != Duration::MAX;
215 let interval_task = &mut *query_data.interval_task.borrow_mut();
216
217 let create_interval_task = match interval_task {
218 None if interval_enabled => true,
219 Some((current_interval, current_interval_task)) if interval_enabled => {
220 let new_interval_is_shorter = *current_interval > interval;
221 if new_interval_is_shorter {
222 current_interval_task.cancel();
223 *interval_task = None;
224 }
225 new_interval_is_shorter
226 }
227 _ => false,
228 };
229 if create_interval_task {
230 let task = spawn_forever(async move {
231 loop {
232 time::sleep(interval).await;
234
235 QueriesStorage::<Q>::run_queries(&[(&query_clone, &query_data_clone)]).await;
237 }
238 });
239 *interval_task = Some((interval, task));
240 }
241
242 query_data.clone()
243 }
244
245 fn update_tasks(&mut self, query: Query<Q>) {
246 let mut storage_clone = self.storage;
247 let mut storage = self.storage.write();
248
249 let query_data = storage.get_mut(&query).unwrap();
250
251 if let Some((_, interval_task)) = query_data.interval_task.take() {
253 interval_task.cancel();
254 }
255
256 if query_data.reactive_contexts.lock().unwrap().is_empty() {
258 *query_data.clean_task.borrow_mut() = Some(spawn_forever(async move {
259 time::sleep(query.clean_time).await;
261
262 let mut storage = storage_clone.write();
264 storage.remove(&query);
265 }));
266 }
267 }
268
269 pub async fn get(get_query: GetQuery<Q>) -> QueryReader<Q> {
270 let query: Query<Q> = get_query.into();
271
272 let mut storage = match try_consume_context::<QueriesStorage<Q>>() {
273 Some(storage) => storage,
274 None => provide_root_context(QueriesStorage::<Q>::new_in_root()),
275 };
276
277 let query_data = storage
278 .storage
279 .write()
280 .entry(query.clone())
281 .or_insert_with(|| QueryData {
282 state: Rc::new(RefCell::new(QueryStateData::Pending)),
283 reactive_contexts: Arc::default(),
284 suspense_task: Rc::default(),
285 interval_task: Rc::default(),
286 clean_task: Rc::default(),
287 })
288 .clone();
289
290 if query_data.state.borrow().is_stale(&query) {
292 let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending)
294 .into_loading();
295 *query_data.state.borrow_mut() = res;
296 for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
297 reactive_context.mark_dirty();
298 }
299
300 let res = query.query.run(&query.keys).await;
302
303 *query_data.state.borrow_mut() = QueryStateData::Settled {
305 res,
306 settlement_instant: Instant::now(),
307 };
308 for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
309 reactive_context.mark_dirty();
310 }
311
312 if let Some(suspense_task) = &*query_data.suspense_task.borrow() {
314 suspense_task.notifier.notify_waiters();
315 };
316 }
317
318 if query_data.reactive_contexts.lock().unwrap().is_empty() {
320 *query_data.clean_task.borrow_mut() = Some(spawn_forever(async move {
321 time::sleep(query.clean_time).await;
323
324 let mut storage = storage.storage.write();
326 storage.remove(&query);
327 }));
328 }
329
330 QueryReader {
331 state: query_data.state,
332 }
333 }
334
335 pub async fn invalidate_all() {
336 let storage = consume_context::<QueriesStorage<Q>>();
337
338 let matching_queries = storage
340 .storage
341 .read()
342 .clone()
343 .into_iter()
344 .collect::<Vec<_>>();
345 let matching_queries = matching_queries
346 .iter()
347 .map(|(q, d)| (q, d))
348 .collect::<Vec<_>>();
349
350 Self::run_queries(&matching_queries).await
352 }
353
354 pub async fn invalidate_matching(matching_keys: Q::Keys) {
355 let storage = consume_context::<QueriesStorage<Q>>();
356
357 let mut matching_queries = Vec::new();
359 for (query, data) in storage.storage.read().iter() {
360 if query.query.matches(&matching_keys) {
361 matching_queries.push((query.clone(), data.clone()));
362 }
363 }
364 let matching_queries = matching_queries
365 .iter()
366 .map(|(q, d)| (q, d))
367 .collect::<Vec<_>>();
368
369 Self::run_queries(&matching_queries).await
371 }
372
373 async fn run_queries(queries: &[(&Query<Q>, &QueryData<Q>)]) {
374 let tasks = FuturesUnordered::new();
375
376 for (query, query_data) in queries {
377 let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending)
379 .into_loading();
380 *query_data.state.borrow_mut() = res;
381 for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
382 reactive_context.mark_dirty();
383 }
384
385 tasks.push(Box::pin(async move {
386 let res = query.query.run(&query.keys).await;
388
389 *query_data.state.borrow_mut() = QueryStateData::Settled {
391 res,
392 settlement_instant: Instant::now(),
393 };
394 for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
395 reactive_context.mark_dirty();
396 }
397
398 if let Some(suspense_task) = &*query_data.suspense_task.borrow() {
400 suspense_task.notifier.notify_waiters();
401 };
402 }));
403 }
404
405 tasks.count().await;
406 }
407}
408
409pub struct GetQuery<Q: QueryCapability> {
410 query: Q,
411 keys: Q::Keys,
412
413 stale_time: Duration,
414 clean_time: Duration,
415}
416
417impl<Q: QueryCapability> GetQuery<Q> {
418 pub fn new(keys: Q::Keys, query: Q) -> Self {
419 Self {
420 query,
421 keys,
422 stale_time: Duration::ZERO,
423 clean_time: Duration::ZERO,
424 }
425 }
426 pub fn stale_time(self, stale_time: Duration) -> Self {
430 Self { stale_time, ..self }
431 }
432
433 pub fn clean_time(self, clean_time: Duration) -> Self {
437 Self { clean_time, ..self }
438 }
439}
440
441impl<Q: QueryCapability> From<GetQuery<Q>> for Query<Q> {
442 fn from(value: GetQuery<Q>) -> Self {
443 Query {
444 query: value.query,
445 keys: value.keys,
446
447 enabled: true,
448
449 stale_time: value.stale_time,
450 clean_time: value.clean_time,
451 interval_time: Duration::MAX,
452 }
453 }
454}
455#[derive(PartialEq, Clone)]
456pub struct Query<Q: QueryCapability> {
457 query: Q,
458 keys: Q::Keys,
459
460 enabled: bool,
461
462 stale_time: Duration,
463 clean_time: Duration,
464 interval_time: Duration,
465}
466
467impl<Q: QueryCapability> Eq for Query<Q> {}
468impl<Q: QueryCapability> Hash for Query<Q> {
469 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
470 self.query.hash(state);
471 self.keys.hash(state);
472
473 self.enabled.hash(state);
474
475 self.stale_time.hash(state);
476 self.clean_time.hash(state);
477
478 }
481}
482
483impl<Q: QueryCapability> Query<Q> {
484 pub fn new(keys: Q::Keys, query: Q) -> Self {
485 Self {
486 query,
487 keys,
488 enabled: true,
489 stale_time: Duration::ZERO,
490 clean_time: Duration::from_secs(5 * 60),
491 interval_time: Duration::MAX,
492 }
493 }
494
495 pub fn enable(self, enabled: bool) -> Self {
499 Self { enabled, ..self }
500 }
501
502 pub fn stale_time(self, stale_time: Duration) -> Self {
507 Self { stale_time, ..self }
508 }
509
510 pub fn clean_time(self, clean_time: Duration) -> Self {
514 Self { clean_time, ..self }
515 }
516
517 pub fn interval_time(self, interval_time: Duration) -> Self {
523 Self {
524 interval_time,
525 ..self
526 }
527 }
528}
529
530pub struct QueryReader<Q: QueryCapability> {
531 state: Rc<RefCell<QueryStateData<Q>>>,
532}
533
534impl<Q: QueryCapability> QueryReader<Q> {
535 pub fn state(&self) -> Ref<QueryStateData<Q>> {
536 self.state.borrow()
537 }
538
539 pub fn as_settled(&self) -> Ref<Result<Q::Ok, Q::Err>> {
543 Ref::map(self.state.borrow(), |state| match state {
544 QueryStateData::Settled { res, .. } => res,
545 _ => panic!("Query is not settled."),
546 })
547 }
548}
549
550pub struct UseQuery<Q: QueryCapability> {
551 query: Memo<Query<Q>>,
552}
553
554impl<Q: QueryCapability> Clone for UseQuery<Q> {
555 fn clone(&self) -> Self {
556 *self
557 }
558}
559
560impl<Q: QueryCapability> Copy for UseQuery<Q> {}
561
562impl<Q: QueryCapability> UseQuery<Q> {
563 pub fn read(&self) -> QueryReader<Q> {
568 let storage = consume_context::<QueriesStorage<Q>>();
569 let query_data = storage
570 .storage
571 .peek_unchecked()
572 .get(&self.query.peek())
573 .cloned()
574 .unwrap();
575
576 if let Some(reactive_context) = ReactiveContext::current() {
578 reactive_context.subscribe(query_data.reactive_contexts);
579 }
580
581 QueryReader {
582 state: query_data.state,
583 }
584 }
585
586 pub fn peek(&self) -> QueryReader<Q> {
591 let storage = consume_context::<QueriesStorage<Q>>();
592 let query_data = storage
593 .storage
594 .peek_unchecked()
595 .get(&self.query.peek())
596 .cloned()
597 .unwrap();
598
599 QueryReader {
600 state: query_data.state,
601 }
602 }
603
604 pub fn suspend(&self) -> Result<Result<Q::Ok, Q::Err>, RenderError>
608 where
609 Q::Ok: Clone,
610 Q::Err: Clone,
611 {
612 let storage = consume_context::<QueriesStorage<Q>>();
613 let mut storage = storage.storage.write_unchecked();
614 let query_data = storage.get_mut(&self.query.peek()).unwrap();
615
616 if let Some(reactive_context) = ReactiveContext::current() {
618 reactive_context.subscribe(query_data.reactive_contexts.clone());
619 }
620
621 let state = &*query_data.state.borrow();
622 match state {
623 QueryStateData::Pending | QueryStateData::Loading { res: None } => {
624 let suspense_task_clone = query_data.suspense_task.clone();
625 let mut suspense_task = query_data.suspense_task.borrow_mut();
626 let QuerySuspenseData { task, .. } = suspense_task.get_or_insert_with(|| {
627 let notifier = Arc::new(Notify::new());
628 let task = spawn({
629 let notifier = notifier.clone();
630 async move {
631 notifier.notified().await;
632 let _ = suspense_task_clone.borrow_mut().take();
633 }
634 });
635 QuerySuspenseData { notifier, task }
636 });
637 Err(RenderError::Suspended(SuspendedFuture::new(*task)))
638 }
639 QueryStateData::Settled { res, .. } | QueryStateData::Loading { res: Some(res) } => {
640 Ok(res.clone())
641 }
642 }
643 }
644
645 pub async fn invalidate_async(&self) -> QueryReader<Q> {
649 let storage = consume_context::<QueriesStorage<Q>>();
650
651 let query = self.query.peek().clone();
652 let query_data = storage
653 .storage
654 .peek_unchecked()
655 .get(&query)
656 .cloned()
657 .unwrap();
658
659 QueriesStorage::run_queries(&[(&query, &query_data)]).await;
661
662 QueryReader {
663 state: query_data.state.clone(),
664 }
665 }
666
667 pub fn invalidate(&self) {
671 let storage = consume_context::<QueriesStorage<Q>>();
672
673 let query = self.query.peek().clone();
674 let query_data = storage
675 .storage
676 .peek_unchecked()
677 .get(&query)
678 .cloned()
679 .unwrap();
680
681 spawn(async move { QueriesStorage::run_queries(&[(&query, &query_data)]).await });
683 }
684}
685
686pub fn use_query<Q: QueryCapability>(query: Query<Q>) -> UseQuery<Q> {
714 let mut storage = match try_consume_context::<QueriesStorage<Q>>() {
715 Some(storage) => storage,
716 None => provide_root_context(QueriesStorage::<Q>::new_in_root()),
717 };
718
719 let current_query = use_hook(|| Rc::new(RefCell::new(None)));
720
721 let query = use_memo(use_reactive!(|query| {
722 let query_data = storage.insert_or_get_query(query.clone());
723
724 if let Some(prev_query) = current_query.borrow_mut().take() {
726 storage.update_tasks(prev_query);
727 }
728
729 current_query.borrow_mut().replace(query.clone());
731
732 if query.enabled && query_data.state.borrow().is_stale(&query) {
734 let query = query.clone();
735 spawn(async move {
736 QueriesStorage::run_queries(&[(&query, &query_data)]).await;
737 });
738 }
739
740 query
741 }));
742
743 use_drop({
745 move || {
746 storage.update_tasks(query.peek().clone());
747 }
748 });
749
750 UseQuery { query }
751}