1use core::fmt;
2use std::{
3 cell::{Ref, RefCell},
4 collections::{HashMap, HashSet},
5 future::Future,
6 hash::Hash,
7 mem,
8 rc::Rc,
9 sync::{Arc, Mutex},
10 time::Duration,
11};
12
13use dioxus::prelude::*;
14use dioxus::signals::CopyValue;
15use dioxus_core::{
16 provide_root_context, spawn_forever, use_drop, ReactiveContext, SuspendedFuture, Task,
17};
18use futures_util::stream::{FuturesUnordered, StreamExt};
19use tokio::sync::Notify;
20#[cfg(not(target_family = "wasm"))]
21use tokio::time;
22#[cfg(not(target_family = "wasm"))]
23use tokio::time::Instant;
24#[cfg(target_family = "wasm")]
25use wasmtimer::tokio as time;
26#[cfg(target_family = "wasm")]
27use web_time::Instant;
28
29pub trait QueryCapability
30where
31 Self: 'static + Clone + PartialEq + Hash + Eq,
32{
33 type Ok;
34 type Err;
35 type Keys: Hash + PartialEq + Clone;
36
37 fn run(&self, keys: &Self::Keys) -> impl Future<Output = Result<Self::Ok, Self::Err>>;
39
40 fn matches(&self, _keys: &Self::Keys) -> bool {
42 true
43 }
44}
45
46pub enum QueryStateData<Q: QueryCapability> {
47 Pending,
49 Loading { res: Option<Result<Q::Ok, Q::Err>> },
51 Settled {
53 res: Result<Q::Ok, Q::Err>,
54 settlement_instant: Instant,
55 },
56}
57
58impl<Q: QueryCapability> TryFrom<QueryStateData<Q>> for Result<Q::Ok, Q::Err> {
59 type Error = ();
60
61 fn try_from(value: QueryStateData<Q>) -> Result<Self, Self::Error> {
62 match value {
63 QueryStateData::Loading { res: Some(res) } => Ok(res),
64 QueryStateData::Settled { res, .. } => Ok(res),
65 _ => Err(()),
66 }
67 }
68}
69
70impl<Q> fmt::Debug for QueryStateData<Q>
71where
72 Q: QueryCapability,
73 Q::Ok: fmt::Debug,
74 Q::Err: fmt::Debug,
75{
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 match self {
78 Self::Pending => f.write_str("Pending"),
79 Self::Loading { res } => write!(f, "Loading {{ {res:?} }}"),
80 Self::Settled { res, .. } => write!(f, "Settled {{ {res:?} }}"),
81 }
82 }
83}
84
85impl<Q: QueryCapability> QueryStateData<Q> {
86 pub fn is_ok(&self) -> bool {
88 matches!(self, QueryStateData::Settled { res: Ok(_), .. })
89 }
90
91 pub fn is_err(&self) -> bool {
93 matches!(self, QueryStateData::Settled { res: Err(_), .. })
94 }
95
96 pub fn is_loading(&self) -> bool {
98 matches!(self, QueryStateData::Loading { .. })
99 }
100
101 pub fn is_pending(&self) -> bool {
103 matches!(self, QueryStateData::Pending)
104 }
105
106 pub fn is_stale(&self, query: &Query<Q>) -> bool {
108 match self {
109 QueryStateData::Pending => true,
110 QueryStateData::Loading { .. } => true,
111 QueryStateData::Settled {
112 settlement_instant, ..
113 } => Instant::now().duration_since(*settlement_instant) >= query.stale_time,
114 }
115 }
116
117 pub fn ok(&self) -> Option<&Q::Ok> {
119 match self {
120 Self::Settled { res: Ok(res), .. } => Some(res),
121 Self::Loading { res: Some(Ok(res)) } => Some(res),
122 _ => None,
123 }
124 }
125
126 pub fn unwrap(&self) -> &Result<Q::Ok, Q::Err> {
128 match self {
129 Self::Loading { res: Some(v) } => v,
130 Self::Settled { res, .. } => res,
131 _ => unreachable!(),
132 }
133 }
134
135 fn into_loading(self) -> QueryStateData<Q> {
136 match self {
137 QueryStateData::Pending => QueryStateData::Loading { res: None },
138 QueryStateData::Loading { res } => QueryStateData::Loading { res },
139 QueryStateData::Settled { res, .. } => QueryStateData::Loading { res: Some(res) },
140 }
141 }
142}
143pub struct QueriesStorage<Q: QueryCapability> {
144 storage: CopyValue<HashMap<Query<Q>, QueryData<Q>>>,
145}
146
147impl<Q: QueryCapability> Copy for QueriesStorage<Q> {}
148
149impl<Q: QueryCapability> Clone for QueriesStorage<Q> {
150 fn clone(&self) -> Self {
151 *self
152 }
153}
154
155struct QuerySuspenseData {
156 notifier: Arc<Notify>,
157 task: Task,
158}
159
160pub struct QueryData<Q: QueryCapability> {
161 state: Rc<RefCell<QueryStateData<Q>>>,
162 reactive_contexts: Arc<Mutex<HashSet<ReactiveContext>>>,
163
164 suspense_task: Rc<RefCell<Option<QuerySuspenseData>>>,
165 interval_task: Rc<RefCell<Option<(Duration, Task)>>>,
166 clean_task: Rc<RefCell<Option<Task>>>,
167}
168
169impl<Q: QueryCapability> Clone for QueryData<Q> {
170 fn clone(&self) -> Self {
171 Self {
172 state: self.state.clone(),
173 reactive_contexts: self.reactive_contexts.clone(),
174
175 suspense_task: self.suspense_task.clone(),
176 interval_task: self.interval_task.clone(),
177 clean_task: self.clean_task.clone(),
178 }
179 }
180}
181
182impl<Q: QueryCapability> QueriesStorage<Q> {
183 fn new_in_root() -> Self {
184 Self {
185 storage: CopyValue::new_in_scope(HashMap::default(), ScopeId::ROOT),
186 }
187 }
188
189 fn insert_or_get_query(&mut self, query: Query<Q>) -> QueryData<Q> {
190 let query_clone = query.clone();
191 let mut storage = self.storage.write();
192
193 let query_data = storage.entry(query).or_insert_with(|| QueryData {
194 state: Rc::new(RefCell::new(QueryStateData::Pending)),
195 reactive_contexts: Arc::default(),
196 suspense_task: Rc::default(),
197 interval_task: Rc::default(),
198 clean_task: Rc::default(),
199 });
200 let query_data_clone = query_data.clone();
201
202 if let Some(clean_task) = query_data.clean_task.take() {
204 clean_task.cancel();
205 }
206
207 let interval = query_clone.interval_time;
211 let interval_enabled = query_clone.interval_time != Duration::MAX;
212 let interval_task = &mut *query_data.interval_task.borrow_mut();
213
214 let create_interval_task = match interval_task {
215 None if interval_enabled => true,
216 Some((current_interval, current_interval_task)) if interval_enabled => {
217 let new_interval_is_shorter = *current_interval > interval;
218 if new_interval_is_shorter {
219 current_interval_task.cancel();
220 *interval_task = None;
221 }
222 new_interval_is_shorter
223 }
224 _ => false,
225 };
226 if create_interval_task {
227 let task = spawn_forever(async move {
228 loop {
229 time::sleep(interval).await;
231
232 QueriesStorage::<Q>::run_queries(&[(&query_clone, &query_data_clone)]).await;
234 }
235 });
236 *interval_task = Some((interval, task));
237 }
238
239 query_data.clone()
240 }
241
242 fn update_tasks(&mut self, query: Query<Q>) {
243 let mut storage_clone = self.storage;
244 let mut storage = self.storage.write();
245
246 let query_data = storage.get_mut(&query).unwrap();
247
248 if let Some((_, interval_task)) = query_data.interval_task.take() {
250 interval_task.cancel();
251 }
252
253 if query_data.reactive_contexts.lock().unwrap().is_empty() {
255 *query_data.clean_task.borrow_mut() = Some(spawn_forever(async move {
256 time::sleep(query.clean_time).await;
258
259 let mut storage = storage_clone.write();
261 storage.remove(&query);
262 }));
263 }
264 }
265
266 pub async fn get(get_query: GetQuery<Q>) -> QueryReader<Q> {
267 let query: Query<Q> = get_query.into();
268
269 let mut storage = match try_consume_context::<QueriesStorage<Q>>() {
270 Some(storage) => storage,
271 None => provide_root_context(QueriesStorage::<Q>::new_in_root()),
272 };
273
274 let query_data = storage
275 .storage
276 .write()
277 .entry(query.clone())
278 .or_insert_with(|| QueryData {
279 state: Rc::new(RefCell::new(QueryStateData::Pending)),
280 reactive_contexts: Arc::default(),
281 suspense_task: Rc::default(),
282 interval_task: Rc::default(),
283 clean_task: Rc::default(),
284 })
285 .clone();
286
287 if query_data.state.borrow().is_stale(&query) {
289 let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending)
291 .into_loading();
292 *query_data.state.borrow_mut() = res;
293 for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
294 reactive_context.mark_dirty();
295 }
296
297 let res = query.query.run(&query.keys).await;
299
300 *query_data.state.borrow_mut() = QueryStateData::Settled {
302 res,
303 settlement_instant: Instant::now(),
304 };
305 for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
306 reactive_context.mark_dirty();
307 }
308
309 if let Some(suspense_task) = &*query_data.suspense_task.borrow() {
311 suspense_task.notifier.notify_waiters();
312 };
313 }
314
315 if query_data.reactive_contexts.lock().unwrap().is_empty() {
317 *query_data.clean_task.borrow_mut() = Some(spawn_forever(async move {
318 time::sleep(query.clean_time).await;
320
321 let mut storage = storage.storage.write();
323 storage.remove(&query);
324 }));
325 }
326
327 QueryReader {
328 state: query_data.state,
329 }
330 }
331
332 pub async fn invalidate_all() {
333 let storage = consume_context::<QueriesStorage<Q>>();
334
335 let matching_queries = storage
337 .storage
338 .read()
339 .clone()
340 .into_iter()
341 .collect::<Vec<_>>();
342 let matching_queries = matching_queries
343 .iter()
344 .map(|(q, d)| (q, d))
345 .collect::<Vec<_>>();
346
347 Self::run_queries(&matching_queries).await
349 }
350
351 pub async fn invalidate_matching(matching_keys: Q::Keys) {
352 let storage = consume_context::<QueriesStorage<Q>>();
353
354 let mut matching_queries = Vec::new();
356 for (query, data) in storage.storage.read().iter() {
357 if query.query.matches(&matching_keys) {
358 matching_queries.push((query.clone(), data.clone()));
359 }
360 }
361 let matching_queries = matching_queries
362 .iter()
363 .map(|(q, d)| (q, d))
364 .collect::<Vec<_>>();
365
366 Self::run_queries(&matching_queries).await
368 }
369
370 async fn run_queries(queries: &[(&Query<Q>, &QueryData<Q>)]) {
371 let tasks = FuturesUnordered::new();
372
373 for (query, query_data) in queries {
374 let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending)
376 .into_loading();
377 *query_data.state.borrow_mut() = res;
378 for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
379 reactive_context.mark_dirty();
380 }
381
382 tasks.push(Box::pin(async move {
383 let res = query.query.run(&query.keys).await;
385
386 *query_data.state.borrow_mut() = QueryStateData::Settled {
388 res,
389 settlement_instant: Instant::now(),
390 };
391 for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
392 reactive_context.mark_dirty();
393 }
394
395 if let Some(suspense_task) = &*query_data.suspense_task.borrow() {
397 suspense_task.notifier.notify_waiters();
398 };
399 }));
400 }
401
402 tasks.count().await;
403 }
404}
405
406pub struct GetQuery<Q: QueryCapability> {
407 query: Q,
408 keys: Q::Keys,
409
410 stale_time: Duration,
411 clean_time: Duration,
412}
413
414impl<Q: QueryCapability> GetQuery<Q> {
415 pub fn new(keys: Q::Keys, query: Q) -> Self {
416 Self {
417 query,
418 keys,
419 stale_time: Duration::ZERO,
420 clean_time: Duration::ZERO,
421 }
422 }
423 pub fn stale_time(self, stale_time: Duration) -> Self {
427 Self { stale_time, ..self }
428 }
429
430 pub fn clean_time(self, clean_time: Duration) -> Self {
434 Self { clean_time, ..self }
435 }
436}
437
438impl<Q: QueryCapability> From<GetQuery<Q>> for Query<Q> {
439 fn from(value: GetQuery<Q>) -> Self {
440 Query {
441 query: value.query,
442 keys: value.keys,
443
444 enabled: true,
445
446 stale_time: value.stale_time,
447 clean_time: value.clean_time,
448 interval_time: Duration::MAX,
449 }
450 }
451}
452#[derive(PartialEq, Clone)]
453pub struct Query<Q: QueryCapability> {
454 query: Q,
455 keys: Q::Keys,
456
457 enabled: bool,
458
459 stale_time: Duration,
460 clean_time: Duration,
461 interval_time: Duration,
462}
463
464impl<Q: QueryCapability> Eq for Query<Q> {}
465impl<Q: QueryCapability> Hash for Query<Q> {
466 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
467 self.query.hash(state);
468 self.keys.hash(state);
469
470 self.enabled.hash(state);
471
472 self.stale_time.hash(state);
473 self.clean_time.hash(state);
474
475 }
478}
479
480impl<Q: QueryCapability> Query<Q> {
481 pub fn new(keys: Q::Keys, query: Q) -> Self {
482 Self {
483 query,
484 keys,
485 enabled: true,
486 stale_time: Duration::ZERO,
487 clean_time: Duration::from_secs(5 * 60),
488 interval_time: Duration::MAX,
489 }
490 }
491
492 pub fn enable(self, enabled: bool) -> Self {
496 Self { enabled, ..self }
497 }
498
499 pub fn stale_time(self, stale_time: Duration) -> Self {
504 Self { stale_time, ..self }
505 }
506
507 pub fn clean_time(self, clean_time: Duration) -> Self {
511 Self { clean_time, ..self }
512 }
513
514 pub fn interval_time(self, interval_time: Duration) -> Self {
520 Self {
521 interval_time,
522 ..self
523 }
524 }
525}
526
527pub struct QueryReader<Q: QueryCapability> {
528 state: Rc<RefCell<QueryStateData<Q>>>,
529}
530
531impl<Q: QueryCapability> QueryReader<Q> {
532 pub fn state(&self) -> Ref<QueryStateData<Q>> {
533 self.state.borrow()
534 }
535
536 pub fn as_settled(&self) -> Ref<Result<Q::Ok, Q::Err>> {
540 Ref::map(self.state.borrow(), |state| match state {
541 QueryStateData::Settled { res, .. } => res,
542 _ => panic!("Query is not settled."),
543 })
544 }
545}
546
547pub struct UseQuery<Q: QueryCapability> {
548 query: Signal<Query<Q>>,
549}
550
551impl<Q: QueryCapability> Clone for UseQuery<Q> {
552 fn clone(&self) -> Self {
553 *self
554 }
555}
556
557impl<Q: QueryCapability> Copy for UseQuery<Q> {}
558
559impl<Q: QueryCapability> UseQuery<Q> {
560 pub fn read(&self) -> QueryReader<Q> {
565 let storage = consume_context::<QueriesStorage<Q>>();
566 let query_data = storage
567 .storage
568 .peek_unchecked()
569 .get(&self.query.peek())
570 .cloned()
571 .unwrap();
572
573 if let Some(reactive_context) = ReactiveContext::current() {
575 reactive_context.subscribe(query_data.reactive_contexts);
576 }
577
578 QueryReader {
579 state: query_data.state,
580 }
581 }
582
583 pub fn peek(&self) -> QueryReader<Q> {
588 let storage = consume_context::<QueriesStorage<Q>>();
589 let query_data = storage
590 .storage
591 .peek_unchecked()
592 .get(&self.query.peek())
593 .cloned()
594 .unwrap();
595
596 QueryReader {
597 state: query_data.state,
598 }
599 }
600
601 pub fn suspend(&self) -> Result<Result<Q::Ok, Q::Err>, RenderError>
605 where
606 Q::Ok: Clone,
607 Q::Err: Clone,
608 {
609 let storage = consume_context::<QueriesStorage<Q>>();
610 let mut storage = storage.storage.write_unchecked();
611 let query_data = storage.get_mut(&self.query.peek()).unwrap();
612
613 if let Some(reactive_context) = ReactiveContext::current() {
615 reactive_context.subscribe(query_data.reactive_contexts.clone());
616 }
617
618 let state = &*query_data.state.borrow();
619 match state {
620 QueryStateData::Pending | QueryStateData::Loading { res: None } => {
621 let suspense_task_clone = query_data.suspense_task.clone();
622 let mut suspense_task = query_data.suspense_task.borrow_mut();
623 let QuerySuspenseData { task, .. } = suspense_task.get_or_insert_with(|| {
624 let notifier = Arc::new(Notify::new());
625 let task = spawn({
626 let notifier = notifier.clone();
627 async move {
628 notifier.notified().await;
629 let _ = suspense_task_clone.borrow_mut().take();
630 }
631 });
632 QuerySuspenseData { notifier, task }
633 });
634 Err(RenderError::Suspended(SuspendedFuture::new(*task)))
635 }
636 QueryStateData::Settled { res, .. } | QueryStateData::Loading { res: Some(res) } => {
637 Ok(res.clone())
638 }
639 }
640 }
641
642 pub async fn invalidate_async(&self) -> QueryReader<Q> {
646 let storage = consume_context::<QueriesStorage<Q>>();
647
648 let query = self.query.peek().clone();
649 let query_data = storage
650 .storage
651 .peek_unchecked()
652 .get(&query)
653 .cloned()
654 .unwrap();
655
656 QueriesStorage::run_queries(&[(&query, &query_data)]).await;
658
659 QueryReader {
660 state: query_data.state.clone(),
661 }
662 }
663
664 pub fn invalidate(&self) {
668 let storage = consume_context::<QueriesStorage<Q>>();
669
670 let query = self.query.peek().clone();
671 let query_data = storage
672 .storage
673 .peek_unchecked()
674 .get(&query)
675 .cloned()
676 .unwrap();
677
678 spawn(async move { QueriesStorage::run_queries(&[(&query, &query_data)]).await });
680 }
681}
682
683pub fn use_query<Q: QueryCapability>(query: Query<Q>) -> UseQuery<Q> {
711 let mut storage = match try_consume_context::<QueriesStorage<Q>>() {
712 Some(storage) => storage,
713 None => provide_root_context(QueriesStorage::<Q>::new_in_root()),
714 };
715
716 let mut make_query = |query: &Query<Q>, mut prev_query: Option<Query<Q>>| {
717 let query_data = storage.insert_or_get_query(query.clone());
718
719 if let Some(prev_query) = prev_query.take() {
721 storage.update_tasks(prev_query);
722 }
723
724 if query.enabled && query_data.state.borrow().is_stale(query) {
726 let query = query.clone();
727 spawn(async move {
728 QueriesStorage::run_queries(&[(&query, &query_data)]).await;
729 });
730 }
731 };
732
733 let mut current_query = use_hook(|| {
734 make_query(&query, None);
735 Signal::new(query.clone())
736 });
737
738 if *current_query.read() != query {
739 let prev = mem::replace(&mut *current_query.write(), query.clone());
740 make_query(&query, Some(prev));
741 }
742
743 use_drop({
745 move || {
746 storage.update_tasks(current_query.peek().clone());
747 }
748 });
749
750 UseQuery {
751 query: current_query,
752 }
753}