1use core::fmt;
2use std::{
3 cell::{Ref, RefCell},
4 collections::{HashMap, HashSet},
5 future::Future,
6 hash::Hash,
7 mem,
8 rc::Rc,
9 sync::{Arc, Mutex},
10 time::Duration,
11};
12
13use ::warnings::Warning;
14use dioxus_lib::prelude::Task;
15use dioxus_lib::prelude::*;
16use dioxus_lib::signals::{Readable, Writable};
17use dioxus_lib::{
18 hooks::{use_memo, use_reactive},
19 signals::CopyValue,
20};
21use futures_util::stream::{FuturesUnordered, StreamExt};
22use tokio::sync::Notify;
23#[cfg(not(target_family = "wasm"))]
24use tokio::time;
25#[cfg(not(target_family = "wasm"))]
26use tokio::time::Instant;
27#[cfg(target_family = "wasm")]
28use wasmtimer::tokio as time;
29#[cfg(target_family = "wasm")]
30use web_time::Instant;
31
32pub trait QueryCapability
33where
34 Self: 'static + Clone + PartialEq + Hash + Eq,
35{
36 type Ok;
37 type Err;
38 type Keys: Hash + PartialEq + Clone;
39
40 fn run(&self, keys: &Self::Keys) -> impl Future<Output = Result<Self::Ok, Self::Err>>;
42
43 fn matches(&self, _keys: &Self::Keys) -> bool {
45 true
46 }
47}
48
49pub enum QueryStateData<Q: QueryCapability> {
50 Pending,
52 Loading { res: Option<Result<Q::Ok, Q::Err>> },
54 Settled {
56 res: Result<Q::Ok, Q::Err>,
57 settlement_instant: Instant,
58 },
59}
60
61impl<Q: QueryCapability> TryFrom<QueryStateData<Q>> for Result<Q::Ok, Q::Err> {
62 type Error = ();
63
64 fn try_from(value: QueryStateData<Q>) -> Result<Self, Self::Error> {
65 match value {
66 QueryStateData::Loading { res: Some(res) } => Ok(res),
67 QueryStateData::Settled { res, .. } => Ok(res),
68 _ => Err(()),
69 }
70 }
71}
72
73impl<Q> fmt::Debug for QueryStateData<Q>
74where
75 Q: QueryCapability,
76 Q::Ok: fmt::Debug,
77 Q::Err: fmt::Debug,
78{
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 match self {
81 Self::Pending => f.write_str("Pending"),
82 Self::Loading { res } => write!(f, "Loading {{ {res:?} }}"),
83 Self::Settled { res, .. } => write!(f, "Settled {{ {res:?} }}"),
84 }
85 }
86}
87
88impl<Q: QueryCapability> QueryStateData<Q> {
89 pub fn is_ok(&self) -> bool {
91 matches!(self, QueryStateData::Settled { res: Ok(_), .. })
92 }
93
94 pub fn is_err(&self) -> bool {
96 matches!(self, QueryStateData::Settled { res: Err(_), .. })
97 }
98
99 pub fn is_loading(&self) -> bool {
101 matches!(self, QueryStateData::Loading { .. })
102 }
103
104 pub fn is_pending(&self) -> bool {
106 matches!(self, QueryStateData::Pending)
107 }
108
109 pub fn is_stale(&self, query: &Query<Q>) -> bool {
111 match self {
112 QueryStateData::Pending => true,
113 QueryStateData::Loading { .. } => true,
114 QueryStateData::Settled {
115 settlement_instant, ..
116 } => Instant::now().duration_since(*settlement_instant) >= query.stale_time,
117 }
118 }
119
120 pub fn ok(&self) -> Option<&Q::Ok> {
122 match self {
123 Self::Settled { res: Ok(res), .. } => Some(res),
124 Self::Loading { res: Some(Ok(res)) } => Some(res),
125 _ => None,
126 }
127 }
128
129 pub fn unwrap(&self) -> &Result<Q::Ok, Q::Err> {
131 match self {
132 Self::Loading { res: Some(v) } => v,
133 Self::Settled { res, .. } => res,
134 _ => unreachable!(),
135 }
136 }
137
138 fn into_loading(self) -> QueryStateData<Q> {
139 match self {
140 QueryStateData::Pending => QueryStateData::Loading { res: None },
141 QueryStateData::Loading { res } => QueryStateData::Loading { res },
142 QueryStateData::Settled { res, .. } => QueryStateData::Loading { res: Some(res) },
143 }
144 }
145}
146pub struct QueriesStorage<Q: QueryCapability> {
147 storage: CopyValue<HashMap<Query<Q>, QueryData<Q>>>,
148}
149
150impl<Q: QueryCapability> Copy for QueriesStorage<Q> {}
151
152impl<Q: QueryCapability> Clone for QueriesStorage<Q> {
153 fn clone(&self) -> Self {
154 *self
155 }
156}
157
158struct QuerySuspenseData {
159 notifier: Arc<Notify>,
160 task: Task,
161}
162
163pub struct QueryData<Q: QueryCapability> {
164 state: Rc<RefCell<QueryStateData<Q>>>,
165 reactive_contexts: Arc<Mutex<HashSet<ReactiveContext>>>,
166
167 suspense_task: Rc<RefCell<Option<QuerySuspenseData>>>,
168 interval_task: Rc<RefCell<Option<(Duration, Task)>>>,
169 clean_task: Rc<RefCell<Option<Task>>>,
170}
171
172impl<Q: QueryCapability> Clone for QueryData<Q> {
173 fn clone(&self) -> Self {
174 Self {
175 state: self.state.clone(),
176 reactive_contexts: self.reactive_contexts.clone(),
177
178 suspense_task: self.suspense_task.clone(),
179 interval_task: self.interval_task.clone(),
180 clean_task: self.clean_task.clone(),
181 }
182 }
183}
184
185impl<Q: QueryCapability> QueriesStorage<Q> {
186 fn new_in_root() -> Self {
187 Self {
188 storage: CopyValue::new_in_scope(HashMap::default(), ScopeId::ROOT),
189 }
190 }
191
192 fn insert_or_get_query(&mut self, query: Query<Q>) -> QueryData<Q> {
193 let query_clone = query.clone();
194 let mut storage = self.storage.write();
195
196 let query_data = storage.entry(query).or_insert_with(|| QueryData {
197 state: Rc::new(RefCell::new(QueryStateData::Pending)),
198 reactive_contexts: Arc::default(),
199 suspense_task: Rc::default(),
200 interval_task: Rc::default(),
201 clean_task: Rc::default(),
202 });
203 let query_data_clone = query_data.clone();
204
205 if let Some(clean_task) = query_data.clean_task.take() {
207 clean_task.cancel();
208 }
209
210 let interval = query_clone.interval_time;
214 let interval_enabled = query_clone.interval_time != Duration::MAX;
215 let interval_task = &mut *query_data.interval_task.borrow_mut();
216
217 let create_interval_task = match interval_task {
218 None if interval_enabled => true,
219 Some((current_interval, current_interval_task)) if interval_enabled => {
220 let new_interval_is_shorter = *current_interval > interval;
221 if new_interval_is_shorter {
222 current_interval_task.cancel();
223 *interval_task = None;
224 }
225 new_interval_is_shorter
226 }
227 _ => false,
228 };
229 if create_interval_task {
230 let task = spawn_forever(async move {
231 loop {
232 time::sleep(interval).await;
234
235 QueriesStorage::<Q>::run_queries(&[(&query_clone, &query_data_clone)]).await;
237 }
238 })
239 .expect("Failed to spawn interval task.");
240 *interval_task = Some((interval, task));
241 }
242
243 query_data.clone()
244 }
245
246 fn update_tasks(&mut self, query: Query<Q>) {
247 let mut storage_clone = self.storage;
248 let mut storage = self.storage.write();
249
250 let query_data = storage.get_mut(&query).unwrap();
251
252 if let Some((_, interval_task)) = query_data.interval_task.take() {
254 interval_task.cancel();
255 }
256
257 if query_data.reactive_contexts.lock().unwrap().is_empty() {
259 *query_data.clean_task.borrow_mut() = spawn_forever(async move {
260 time::sleep(query.clean_time).await;
262
263 let mut storage = storage_clone.write();
265 storage.remove(&query);
266 });
267 }
268 }
269
270 pub async fn get(get_query: GetQuery<Q>) -> QueryReader<Q> {
271 let query: Query<Q> = get_query.into();
272
273 let mut storage = match try_consume_context::<QueriesStorage<Q>>() {
274 Some(storage) => storage,
275 None => provide_root_context(QueriesStorage::<Q>::new_in_root()),
276 };
277
278 let query_data = storage
279 .storage
280 .write()
281 .entry(query.clone())
282 .or_insert_with(|| QueryData {
283 state: Rc::new(RefCell::new(QueryStateData::Pending)),
284 reactive_contexts: Arc::default(),
285 suspense_task: Rc::default(),
286 interval_task: Rc::default(),
287 clean_task: Rc::default(),
288 })
289 .clone();
290
291 if query_data.state.borrow().is_stale(&query) {
293 let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending)
295 .into_loading();
296 *query_data.state.borrow_mut() = res;
297 for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
298 reactive_context.mark_dirty();
299 }
300
301 let res = query.query.run(&query.keys).await;
303
304 *query_data.state.borrow_mut() = QueryStateData::Settled {
306 res,
307 settlement_instant: Instant::now(),
308 };
309 for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
310 reactive_context.mark_dirty();
311 }
312
313 if let Some(suspense_task) = &*query_data.suspense_task.borrow() {
315 suspense_task.notifier.notify_waiters();
316 };
317 }
318
319 if query_data.reactive_contexts.lock().unwrap().is_empty() {
321 *query_data.clean_task.borrow_mut() = spawn_forever(async move {
322 time::sleep(query.clean_time).await;
324
325 let mut storage = storage.storage.write();
327 storage.remove(&query);
328 });
329 }
330
331 QueryReader {
332 state: query_data.state,
333 }
334 }
335
336 pub async fn invalidate_all() {
337 let storage = consume_context::<QueriesStorage<Q>>();
338
339 let matching_queries = storage
341 .storage
342 .read()
343 .clone()
344 .into_iter()
345 .collect::<Vec<_>>();
346 let matching_queries = matching_queries
347 .iter()
348 .map(|(q, d)| (q, d))
349 .collect::<Vec<_>>();
350
351 Self::run_queries(&matching_queries).await
353 }
354
355 pub async fn invalidate_matching(matching_keys: Q::Keys) {
356 let storage = consume_context::<QueriesStorage<Q>>();
357
358 let mut matching_queries = Vec::new();
360 for (query, data) in storage.storage.read().iter() {
361 if query.query.matches(&matching_keys) {
362 matching_queries.push((query.clone(), data.clone()));
363 }
364 }
365 let matching_queries = matching_queries
366 .iter()
367 .map(|(q, d)| (q, d))
368 .collect::<Vec<_>>();
369
370 Self::run_queries(&matching_queries).await
372 }
373
374 async fn run_queries(queries: &[(&Query<Q>, &QueryData<Q>)]) {
375 let tasks = FuturesUnordered::new();
376
377 for (query, query_data) in queries {
378 let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending)
380 .into_loading();
381 *query_data.state.borrow_mut() = res;
382 for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
383 reactive_context.mark_dirty();
384 }
385
386 tasks.push(Box::pin(async move {
387 let res = query.query.run(&query.keys).await;
389
390 *query_data.state.borrow_mut() = QueryStateData::Settled {
392 res,
393 settlement_instant: Instant::now(),
394 };
395 for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
396 reactive_context.mark_dirty();
397 }
398
399 if let Some(suspense_task) = &*query_data.suspense_task.borrow() {
401 suspense_task.notifier.notify_waiters();
402 };
403 }));
404 }
405
406 tasks.count().await;
407 }
408}
409
410pub struct GetQuery<Q: QueryCapability> {
411 query: Q,
412 keys: Q::Keys,
413
414 stale_time: Duration,
415 clean_time: Duration,
416}
417
418impl<Q: QueryCapability> GetQuery<Q> {
419 pub fn new(keys: Q::Keys, query: Q) -> Self {
420 Self {
421 query,
422 keys,
423 stale_time: Duration::ZERO,
424 clean_time: Duration::ZERO,
425 }
426 }
427 pub fn stale_time(self, stale_time: Duration) -> Self {
431 Self { stale_time, ..self }
432 }
433
434 pub fn clean_time(self, clean_time: Duration) -> Self {
438 Self { clean_time, ..self }
439 }
440}
441
442impl<Q: QueryCapability> From<GetQuery<Q>> for Query<Q> {
443 fn from(value: GetQuery<Q>) -> Self {
444 Query {
445 query: value.query,
446 keys: value.keys,
447
448 enabled: true,
449
450 stale_time: value.stale_time,
451 clean_time: value.clean_time,
452 interval_time: Duration::MAX,
453 }
454 }
455}
456#[derive(PartialEq, Clone)]
457pub struct Query<Q: QueryCapability> {
458 query: Q,
459 keys: Q::Keys,
460
461 enabled: bool,
462
463 stale_time: Duration,
464 clean_time: Duration,
465 interval_time: Duration,
466}
467
468impl<Q: QueryCapability> Eq for Query<Q> {}
469impl<Q: QueryCapability> Hash for Query<Q> {
470 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
471 self.query.hash(state);
472 self.keys.hash(state);
473
474 self.enabled.hash(state);
475
476 self.stale_time.hash(state);
477 self.clean_time.hash(state);
478
479 }
482}
483
484impl<Q: QueryCapability> Query<Q> {
485 pub fn new(keys: Q::Keys, query: Q) -> Self {
486 Self {
487 query,
488 keys,
489 enabled: true,
490 stale_time: Duration::ZERO,
491 clean_time: Duration::from_secs(5 * 60),
492 interval_time: Duration::MAX,
493 }
494 }
495
496 pub fn enable(self, enabled: bool) -> Self {
500 Self { enabled, ..self }
501 }
502
503 pub fn stale_time(self, stale_time: Duration) -> Self {
508 Self { stale_time, ..self }
509 }
510
511 pub fn clean_time(self, clean_time: Duration) -> Self {
515 Self { clean_time, ..self }
516 }
517
518 pub fn interval_time(self, interval_time: Duration) -> Self {
524 Self {
525 interval_time,
526 ..self
527 }
528 }
529}
530
531pub struct QueryReader<Q: QueryCapability> {
532 state: Rc<RefCell<QueryStateData<Q>>>,
533}
534
535impl<Q: QueryCapability> QueryReader<Q> {
536 pub fn state(&self) -> Ref<QueryStateData<Q>> {
537 self.state.borrow()
538 }
539
540 pub fn as_settled(&self) -> Ref<Result<Q::Ok, Q::Err>> {
544 Ref::map(self.state.borrow(), |state| match state {
545 QueryStateData::Settled { res, .. } => res,
546 _ => panic!("Query is not settled."),
547 })
548 }
549}
550
551pub struct UseQuery<Q: QueryCapability> {
552 query: Memo<Query<Q>>,
553}
554
555impl<Q: QueryCapability> Clone for UseQuery<Q> {
556 fn clone(&self) -> Self {
557 *self
558 }
559}
560
561impl<Q: QueryCapability> Copy for UseQuery<Q> {}
562
563impl<Q: QueryCapability> UseQuery<Q> {
564 pub fn read(&self) -> QueryReader<Q> {
569 let storage = consume_context::<QueriesStorage<Q>>();
570 let query_data = storage
571 .storage
572 .peek_unchecked()
573 .get(&self.query.peek())
574 .cloned()
575 .unwrap();
576
577 if let Some(reactive_context) = ReactiveContext::current() {
579 reactive_context.subscribe(query_data.reactive_contexts);
580 }
581
582 QueryReader {
583 state: query_data.state,
584 }
585 }
586
587 pub fn peek(&self) -> QueryReader<Q> {
592 let storage = consume_context::<QueriesStorage<Q>>();
593 let query_data = storage
594 .storage
595 .peek_unchecked()
596 .get(&self.query.peek())
597 .cloned()
598 .unwrap();
599
600 QueryReader {
601 state: query_data.state,
602 }
603 }
604
605 pub fn suspend(&self) -> Result<Result<Q::Ok, Q::Err>, RenderError>
609 where
610 Q::Ok: Clone,
611 Q::Err: Clone,
612 {
613 let _allow_write_in_component_body =
614 ::warnings::Allow::new(warnings::signal_write_in_component_body::ID);
615
616 let storage = consume_context::<QueriesStorage<Q>>();
617 let mut storage = storage.storage.write_unchecked();
618 let query_data = storage.get_mut(&self.query.peek()).unwrap();
619
620 if let Some(reactive_context) = ReactiveContext::current() {
622 reactive_context.subscribe(query_data.reactive_contexts.clone());
623 }
624
625 let state = &*query_data.state.borrow();
626 match state {
627 QueryStateData::Pending | QueryStateData::Loading { res: None } => {
628 let suspense_task_clone = query_data.suspense_task.clone();
629 let mut suspense_task = query_data.suspense_task.borrow_mut();
630 let QuerySuspenseData { task, .. } = suspense_task.get_or_insert_with(|| {
631 let notifier = Arc::new(Notify::new());
632 let task = spawn({
633 let notifier = notifier.clone();
634 async move {
635 notifier.notified().await;
636 let _ = suspense_task_clone.borrow_mut().take();
637 }
638 });
639 QuerySuspenseData { notifier, task }
640 });
641 Err(RenderError::Suspended(SuspendedFuture::new(*task)))
642 }
643 QueryStateData::Settled { res, .. } | QueryStateData::Loading { res: Some(res) } => {
644 Ok(res.clone())
645 }
646 }
647 }
648
649 pub async fn invalidate_async(&self) -> QueryReader<Q> {
653 let storage = consume_context::<QueriesStorage<Q>>();
654
655 let query = self.query.peek().clone();
656 let query_data = storage
657 .storage
658 .peek_unchecked()
659 .get(&query)
660 .cloned()
661 .unwrap();
662
663 QueriesStorage::run_queries(&[(&query, &query_data)]).await;
665
666 QueryReader {
667 state: query_data.state.clone(),
668 }
669 }
670
671 pub fn invalidate(&self) {
675 let storage = consume_context::<QueriesStorage<Q>>();
676
677 let query = self.query.peek().clone();
678 let query_data = storage
679 .storage
680 .peek_unchecked()
681 .get(&query)
682 .cloned()
683 .unwrap();
684
685 spawn(async move { QueriesStorage::run_queries(&[(&query, &query_data)]).await });
687 }
688}
689
690pub fn use_query<Q: QueryCapability>(query: Query<Q>) -> UseQuery<Q> {
718 let mut storage = match try_consume_context::<QueriesStorage<Q>>() {
719 Some(storage) => storage,
720 None => provide_root_context(QueriesStorage::<Q>::new_in_root()),
721 };
722
723 let current_query = use_hook(|| Rc::new(RefCell::new(None)));
724
725 let query = use_memo(use_reactive!(|query| {
726 let query_data = storage.insert_or_get_query(query.clone());
727
728 if let Some(prev_query) = current_query.borrow_mut().take() {
730 storage.update_tasks(prev_query);
731 }
732
733 current_query.borrow_mut().replace(query.clone());
735
736 if query.enabled && query_data.state.borrow().is_stale(&query) {
738 let query = query.clone();
739 spawn(async move {
740 QueriesStorage::run_queries(&[(&query, &query_data)]).await;
741 });
742 }
743
744 query
745 }));
746
747 use_drop({
749 move || {
750 storage.update_tasks(query.peek().clone());
751 }
752 });
753
754 UseQuery { query }
755}