1use core::fmt;
2use std::{
3 cell::{Ref, RefCell},
4 collections::{HashMap, HashSet},
5 future::Future,
6 hash::Hash,
7 mem,
8 rc::Rc,
9 sync::{Arc, Mutex},
10 time::Duration,
11};
12
13use ::warnings::Warning;
14use dioxus_lib::prelude::Task;
15use dioxus_lib::prelude::*;
16use dioxus_lib::signals::{Readable, Writable};
17use dioxus_lib::{
18 hooks::{use_memo, use_reactive},
19 signals::CopyValue,
20};
21use futures_util::stream::{FuturesUnordered, StreamExt};
22use tokio::{sync::Notify, time::Instant};
23
24pub trait QueryCapability
25where
26 Self: 'static + Clone + PartialEq + Hash + Eq,
27{
28 type Ok;
29 type Err;
30 type Keys: Hash + PartialEq + Clone;
31
32 fn run(&self, keys: &Self::Keys) -> impl Future<Output = Result<Self::Ok, Self::Err>>;
34
35 fn matches(&self, _keys: &Self::Keys) -> bool {
37 true
38 }
39}
40
41pub enum QueryStateData<Q: QueryCapability> {
42 Pending,
44 Loading { res: Option<Result<Q::Ok, Q::Err>> },
46 Settled {
48 res: Result<Q::Ok, Q::Err>,
49 settlement_instant: Instant,
50 },
51}
52
53impl<Q: QueryCapability> TryFrom<QueryStateData<Q>> for Result<Q::Ok, Q::Err> {
54 type Error = ();
55
56 fn try_from(value: QueryStateData<Q>) -> Result<Self, Self::Error> {
57 match value {
58 QueryStateData::Loading { res: Some(res) } => Ok(res),
59 QueryStateData::Settled { res, .. } => Ok(res),
60 _ => Err(()),
61 }
62 }
63}
64
65impl<Q> fmt::Debug for QueryStateData<Q>
66where
67 Q: QueryCapability,
68 Q::Ok: fmt::Debug,
69 Q::Err: fmt::Debug,
70{
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 match self {
73 Self::Pending => f.write_str("Pending"),
74 Self::Loading { res } => write!(f, "Loading {{ {res:?} }}"),
75 Self::Settled { res, .. } => write!(f, "Settled {{ {res:?} }}"),
76 }
77 }
78}
79
80impl<Q: QueryCapability> QueryStateData<Q> {
81 pub fn is_ok(&self) -> bool {
83 matches!(self, QueryStateData::Settled { res: Ok(_), .. })
84 }
85
86 pub fn is_err(&self) -> bool {
88 matches!(self, QueryStateData::Settled { res: Err(_), .. })
89 }
90
91 pub fn is_loading(&self) -> bool {
93 matches!(self, QueryStateData::Loading { .. })
94 }
95
96 pub fn is_pending(&self) -> bool {
98 matches!(self, QueryStateData::Pending)
99 }
100
101 pub fn is_stale(&self, query: &Query<Q>) -> bool {
103 match self {
104 QueryStateData::Pending => true,
105 QueryStateData::Loading { .. } => true,
106 QueryStateData::Settled {
107 settlement_instant, ..
108 } => Instant::now().duration_since(*settlement_instant) >= query.stale_time,
109 }
110 }
111
112 pub fn ok(&self) -> Option<&Q::Ok> {
114 match self {
115 Self::Settled { res: Ok(res), .. } => Some(res),
116 Self::Loading { res: Some(Ok(res)) } => Some(res),
117 _ => None,
118 }
119 }
120
121 pub fn unwrap(&self) -> &Result<Q::Ok, Q::Err> {
123 match self {
124 Self::Loading { res: Some(v) } => v,
125 Self::Settled { res, .. } => res,
126 _ => unreachable!(),
127 }
128 }
129
130 fn into_loading(self) -> QueryStateData<Q> {
131 match self {
132 QueryStateData::Pending => QueryStateData::Loading { res: None },
133 QueryStateData::Loading { res } => QueryStateData::Loading { res },
134 QueryStateData::Settled { res, .. } => QueryStateData::Loading { res: Some(res) },
135 }
136 }
137}
138pub struct QueriesStorage<Q: QueryCapability> {
139 storage: CopyValue<HashMap<Query<Q>, QueryData<Q>>>,
140}
141
142impl<Q: QueryCapability> Copy for QueriesStorage<Q> {}
143
144impl<Q: QueryCapability> Clone for QueriesStorage<Q> {
145 fn clone(&self) -> Self {
146 *self
147 }
148}
149
150struct QuerySuspenseData {
151 notifier: Arc<Notify>,
152 task: Task,
153}
154
155pub struct QueryData<Q: QueryCapability> {
156 state: Rc<RefCell<QueryStateData<Q>>>,
157 reactive_contexts: Arc<Mutex<HashSet<ReactiveContext>>>,
158
159 suspense_task: Rc<RefCell<Option<QuerySuspenseData>>>,
160 interval_task: Rc<RefCell<Option<(Duration, Task)>>>,
161 clean_task: Rc<RefCell<Option<Task>>>,
162}
163
164impl<Q: QueryCapability> Clone for QueryData<Q> {
165 fn clone(&self) -> Self {
166 Self {
167 state: self.state.clone(),
168 reactive_contexts: self.reactive_contexts.clone(),
169
170 suspense_task: self.suspense_task.clone(),
171 interval_task: self.interval_task.clone(),
172 clean_task: self.clean_task.clone(),
173 }
174 }
175}
176
177impl<Q: QueryCapability> QueriesStorage<Q> {
178 fn new_in_root() -> Self {
179 Self {
180 storage: CopyValue::new_in_scope(HashMap::default(), ScopeId::ROOT),
181 }
182 }
183
184 fn insert_or_get_query(&mut self, query: Query<Q>) -> QueryData<Q> {
185 let query_clone = query.clone();
186 let mut storage = self.storage.write();
187
188 let query_data = storage.entry(query).or_insert_with(|| QueryData {
189 state: Rc::new(RefCell::new(QueryStateData::Pending)),
190 reactive_contexts: Arc::default(),
191 suspense_task: Rc::default(),
192 interval_task: Rc::default(),
193 clean_task: Rc::default(),
194 });
195 let query_data_clone = query_data.clone();
196
197 if let Some(clean_task) = query_data.clean_task.take() {
199 clean_task.cancel();
200 }
201
202 let interval = query_clone.interval_time;
206 let interval_enabled = query_clone.interval_time != Duration::MAX;
207 let interval_task = &mut *query_data.interval_task.borrow_mut();
208
209 let create_interval_task = match interval_task {
210 None if interval_enabled => true,
211 Some((current_interval, current_interval_task)) if interval_enabled => {
212 let new_interval_is_shorter = *current_interval > interval;
213 if new_interval_is_shorter {
214 current_interval_task.cancel();
215 *interval_task = None;
216 }
217 new_interval_is_shorter
218 }
219 _ => false,
220 };
221 if create_interval_task {
222 let task = spawn_forever(async move {
223 loop {
224 tokio::time::sleep(interval).await;
226
227 QueriesStorage::<Q>::run_queries(&[(&query_clone, &query_data_clone)]).await;
229 }
230 })
231 .expect("Failed to spawn interval task.");
232 *interval_task = Some((interval, task));
233 }
234
235 query_data.clone()
236 }
237
238 fn update_tasks(&mut self, query: Query<Q>) {
239 let mut storage_clone = self.storage;
240 let mut storage = self.storage.write();
241
242 let query_data = storage.get_mut(&query).unwrap();
243
244 if let Some((_, interval_task)) = query_data.interval_task.take() {
246 interval_task.cancel();
247 }
248
249 if query_data.reactive_contexts.lock().unwrap().is_empty() {
251 *query_data.clean_task.borrow_mut() = spawn_forever(async move {
252 tokio::time::sleep(query.clean_time).await;
254
255 let mut storage = storage_clone.write();
257 storage.remove(&query);
258 });
259 }
260 }
261
262 pub async fn get(get_query: GetQuery<Q>) -> QueryReader<Q> {
263 let query: Query<Q> = get_query.into();
264
265 let mut storage = match try_consume_context::<QueriesStorage<Q>>() {
266 Some(storage) => storage,
267 None => provide_root_context(QueriesStorage::<Q>::new_in_root()),
268 };
269
270 let query_data = storage
271 .storage
272 .write()
273 .entry(query.clone())
274 .or_insert_with(|| QueryData {
275 state: Rc::new(RefCell::new(QueryStateData::Pending)),
276 reactive_contexts: Arc::default(),
277 suspense_task: Rc::default(),
278 interval_task: Rc::default(),
279 clean_task: Rc::default(),
280 })
281 .clone();
282
283 if query_data.state.borrow().is_stale(&query) {
285 let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending)
287 .into_loading();
288 *query_data.state.borrow_mut() = res;
289 for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
290 reactive_context.mark_dirty();
291 }
292
293 let res = query.query.run(&query.keys).await;
295
296 *query_data.state.borrow_mut() = QueryStateData::Settled {
298 res,
299 settlement_instant: Instant::now(),
300 };
301 for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
302 reactive_context.mark_dirty();
303 }
304 }
305
306 if let Some(suspense_task) = &*query_data.suspense_task.borrow() {
308 suspense_task.notifier.notify_waiters();
309 };
310
311 if query_data.reactive_contexts.lock().unwrap().is_empty() {
313 *query_data.clean_task.borrow_mut() = spawn_forever(async move {
314 tokio::time::sleep(query.clean_time).await;
316
317 let mut storage = storage.storage.write();
319 storage.remove(&query);
320 });
321 }
322
323 QueryReader {
324 state: query_data.state,
325 }
326 }
327
328 pub async fn invalidate_all() {
329 let storage = consume_context::<QueriesStorage<Q>>();
330
331 let matching_queries = storage
333 .storage
334 .read()
335 .clone()
336 .into_iter()
337 .collect::<Vec<_>>();
338 let matching_queries = matching_queries
339 .iter()
340 .map(|(q, d)| (q, d))
341 .collect::<Vec<_>>();
342
343 Self::run_queries(&matching_queries).await
345 }
346
347 pub async fn invalidate_matching(matching_keys: Q::Keys) {
348 let storage = consume_context::<QueriesStorage<Q>>();
349
350 let mut matching_queries = Vec::new();
352 for (query, data) in storage.storage.read().iter() {
353 if query.query.matches(&matching_keys) {
354 matching_queries.push((query.clone(), data.clone()));
355 }
356 }
357 let matching_queries = matching_queries
358 .iter()
359 .map(|(q, d)| (q, d))
360 .collect::<Vec<_>>();
361
362 Self::run_queries(&matching_queries).await
364 }
365
366 async fn run_queries(queries: &[(&Query<Q>, &QueryData<Q>)]) {
367 let tasks = FuturesUnordered::new();
368
369 for (query, query_data) in queries {
370 let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending)
372 .into_loading();
373 *query_data.state.borrow_mut() = res;
374 for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
375 reactive_context.mark_dirty();
376 }
377
378 tasks.push(Box::pin(async move {
379 let res = query.query.run(&query.keys).await;
381
382 *query_data.state.borrow_mut() = QueryStateData::Settled {
384 res,
385 settlement_instant: Instant::now(),
386 };
387 for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
388 reactive_context.mark_dirty();
389 }
390 }));
391 }
392
393 tasks.count().await;
394 }
395}
396
397pub struct GetQuery<Q: QueryCapability> {
398 query: Q,
399 keys: Q::Keys,
400
401 stale_time: Duration,
402 clean_time: Duration,
403}
404
405impl<Q: QueryCapability> GetQuery<Q> {
406 pub fn new(keys: Q::Keys, query: Q) -> Self {
407 Self {
408 query,
409 keys,
410 stale_time: Duration::ZERO,
411 clean_time: Duration::ZERO,
412 }
413 }
414 pub fn stale_time(self, stale_time: Duration) -> Self {
418 Self { stale_time, ..self }
419 }
420
421 pub fn clean_time(self, clean_time: Duration) -> Self {
425 Self { clean_time, ..self }
426 }
427}
428
429impl<Q: QueryCapability> From<GetQuery<Q>> for Query<Q> {
430 fn from(value: GetQuery<Q>) -> Self {
431 Query {
432 query: value.query,
433 keys: value.keys,
434
435 enabled: true,
436
437 stale_time: value.stale_time,
438 clean_time: value.clean_time,
439 interval_time: Duration::MAX,
440 }
441 }
442}
443#[derive(PartialEq, Clone)]
444pub struct Query<Q: QueryCapability> {
445 query: Q,
446 keys: Q::Keys,
447
448 enabled: bool,
449
450 stale_time: Duration,
451 clean_time: Duration,
452 interval_time: Duration,
453}
454
455impl<Q: QueryCapability> Eq for Query<Q> {}
456impl<Q: QueryCapability> Hash for Query<Q> {
457 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
458 self.query.hash(state);
459 self.keys.hash(state);
460
461 self.enabled.hash(state);
462
463 self.stale_time.hash(state);
464 self.clean_time.hash(state);
465
466 }
469}
470
471impl<Q: QueryCapability> Query<Q> {
472 pub fn new(keys: Q::Keys, query: Q) -> Self {
473 Self {
474 query,
475 keys,
476 enabled: true,
477 stale_time: Duration::ZERO,
478 clean_time: Duration::from_secs(5 * 60),
479 interval_time: Duration::MAX,
480 }
481 }
482
483 pub fn enable(self, enabled: bool) -> Self {
487 Self { enabled, ..self }
488 }
489
490 pub fn stale_time(self, stale_time: Duration) -> Self {
495 Self { stale_time, ..self }
496 }
497
498 pub fn clean_time(self, clean_time: Duration) -> Self {
502 Self { clean_time, ..self }
503 }
504
505 pub fn interval_time(self, interval_time: Duration) -> Self {
511 Self {
512 interval_time,
513 ..self
514 }
515 }
516}
517
518pub struct QueryReader<Q: QueryCapability> {
519 state: Rc<RefCell<QueryStateData<Q>>>,
520}
521
522impl<Q: QueryCapability> QueryReader<Q> {
523 pub fn state(&self) -> Ref<QueryStateData<Q>> {
524 self.state.borrow()
525 }
526
527 pub fn as_settled(&self) -> Ref<Result<Q::Ok, Q::Err>> {
531 Ref::map(self.state.borrow(), |state| match state {
532 QueryStateData::Settled { res, .. } => res,
533 _ => panic!("Query is not settled."),
534 })
535 }
536}
537
538pub struct UseQuery<Q: QueryCapability> {
539 query: Memo<Query<Q>>,
540}
541
542impl<Q: QueryCapability> Clone for UseQuery<Q> {
543 fn clone(&self) -> Self {
544 *self
545 }
546}
547
548impl<Q: QueryCapability> Copy for UseQuery<Q> {}
549
550impl<Q: QueryCapability> UseQuery<Q> {
551 pub fn read(&self) -> QueryReader<Q> {
556 let storage = consume_context::<QueriesStorage<Q>>();
557 let query_data = storage
558 .storage
559 .peek_unchecked()
560 .get(&self.query.peek())
561 .cloned()
562 .unwrap();
563
564 if let Some(reactive_context) = ReactiveContext::current() {
566 reactive_context.subscribe(query_data.reactive_contexts);
567 }
568
569 QueryReader {
570 state: query_data.state,
571 }
572 }
573
574 pub fn peek(&self) -> QueryReader<Q> {
579 let storage = consume_context::<QueriesStorage<Q>>();
580 let query_data = storage
581 .storage
582 .peek_unchecked()
583 .get(&self.query.peek())
584 .cloned()
585 .unwrap();
586
587 QueryReader {
588 state: query_data.state,
589 }
590 }
591
592 pub fn suspend(&self) -> Result<Result<Q::Ok, Q::Err>, RenderError>
596 where
597 Q::Ok: Clone,
598 Q::Err: Clone,
599 {
600 let _allow_write_in_component_body =
601 ::warnings::Allow::new(warnings::signal_write_in_component_body::ID);
602
603 let storage = consume_context::<QueriesStorage<Q>>();
604 let mut storage = storage.storage.write_unchecked();
605 let query_data = storage.get_mut(&self.query.peek()).unwrap();
606
607 if let Some(reactive_context) = ReactiveContext::current() {
609 reactive_context.subscribe(query_data.reactive_contexts.clone());
610 }
611
612 let state = &*query_data.state.borrow();
613 match state {
614 QueryStateData::Pending | QueryStateData::Loading { res: None } => {
615 let suspense_task_clone = query_data.suspense_task.clone();
616 let mut suspense_task = query_data.suspense_task.borrow_mut();
617 let QuerySuspenseData { task, .. } = suspense_task.get_or_insert_with(|| {
618 let notifier = Arc::new(Notify::new());
619 let task = spawn({
620 let notifier = notifier.clone();
621 async move {
622 notifier.notified().await;
623 let _ = suspense_task_clone.borrow_mut().take();
624 }
625 });
626 QuerySuspenseData { notifier, task }
627 });
628 Err(RenderError::Suspended(SuspendedFuture::new(*task)))
629 }
630 QueryStateData::Settled { res, .. } | QueryStateData::Loading { res: Some(res) } => {
631 Ok(res.clone())
632 }
633 }
634 }
635
636 pub async fn invalidate_async(&self) -> QueryReader<Q> {
640 let storage = consume_context::<QueriesStorage<Q>>();
641
642 let query = self.query.peek().clone();
643 let query_data = storage
644 .storage
645 .peek_unchecked()
646 .get(&query)
647 .cloned()
648 .unwrap();
649
650 QueriesStorage::run_queries(&[(&query, &query_data)]).await;
652
653 QueryReader {
654 state: query_data.state.clone(),
655 }
656 }
657
658 pub fn invalidate(&self) {
662 let storage = consume_context::<QueriesStorage<Q>>();
663
664 let query = self.query.peek().clone();
665 let query_data = storage
666 .storage
667 .peek_unchecked()
668 .get(&query)
669 .cloned()
670 .unwrap();
671
672 spawn(async move { QueriesStorage::run_queries(&[(&query, &query_data)]).await });
674 }
675}
676
677pub fn use_query<Q: QueryCapability>(query: Query<Q>) -> UseQuery<Q> {
705 let mut storage = match try_consume_context::<QueriesStorage<Q>>() {
706 Some(storage) => storage,
707 None => provide_root_context(QueriesStorage::<Q>::new_in_root()),
708 };
709
710 let current_query = use_hook(|| Rc::new(RefCell::new(None)));
711
712 let query = use_memo(use_reactive!(|query| {
713 let query_data = storage.insert_or_get_query(query.clone());
714
715 if let Some(prev_query) = current_query.borrow_mut().take() {
717 storage.update_tasks(prev_query);
718 }
719
720 current_query.borrow_mut().replace(query.clone());
722
723 if query.enabled && query_data.state.borrow().is_stale(&query) {
725 let query = query.clone();
726 spawn(async move {
727 QueriesStorage::run_queries(&[(&query, &query_data)]).await;
728 });
729 }
730
731 query
732 }));
733
734 use_drop({
736 move || {
737 storage.update_tasks(query.peek().clone());
738 }
739 });
740
741 UseQuery { query }
742}