dioxus_query/
query.rs

1use core::fmt;
2use std::{
3    cell::{Ref, RefCell},
4    collections::{HashMap, HashSet},
5    future::Future,
6    hash::Hash,
7    mem,
8    rc::Rc,
9    sync::{Arc, Mutex},
10    time::Duration,
11};
12
13use dioxus::prelude::*;
14use dioxus::{
15    hooks::{use_memo, use_reactive},
16    signals::CopyValue,
17};
18use dioxus_core::{
19    provide_root_context, spawn_forever, use_drop, ReactiveContext, SuspendedFuture, Task,
20};
21use futures_util::stream::{FuturesUnordered, StreamExt};
22use tokio::sync::Notify;
23#[cfg(not(target_family = "wasm"))]
24use tokio::time;
25#[cfg(not(target_family = "wasm"))]
26use tokio::time::Instant;
27#[cfg(target_family = "wasm")]
28use wasmtimer::tokio as time;
29#[cfg(target_family = "wasm")]
30use web_time::Instant;
31
32pub trait QueryCapability
33where
34    Self: 'static + Clone + PartialEq + Hash + Eq,
35{
36    type Ok;
37    type Err;
38    type Keys: Hash + PartialEq + Clone;
39
40    /// Query logic.
41    fn run(&self, keys: &Self::Keys) -> impl Future<Output = Result<Self::Ok, Self::Err>>;
42
43    /// Implement a custom logic to check if this query should be invalidated or not given a [QueryCapability::Keys].
44    fn matches(&self, _keys: &Self::Keys) -> bool {
45        true
46    }
47}
48
49pub enum QueryStateData<Q: QueryCapability> {
50    /// Has not loaded yet.
51    Pending,
52    /// Is loading and may not have a previous settled value.
53    Loading { res: Option<Result<Q::Ok, Q::Err>> },
54    /// Is not loading and has a settled value.
55    Settled {
56        res: Result<Q::Ok, Q::Err>,
57        settlement_instant: Instant,
58    },
59}
60
61impl<Q: QueryCapability> TryFrom<QueryStateData<Q>> for Result<Q::Ok, Q::Err> {
62    type Error = ();
63
64    fn try_from(value: QueryStateData<Q>) -> Result<Self, Self::Error> {
65        match value {
66            QueryStateData::Loading { res: Some(res) } => Ok(res),
67            QueryStateData::Settled { res, .. } => Ok(res),
68            _ => Err(()),
69        }
70    }
71}
72
73impl<Q> fmt::Debug for QueryStateData<Q>
74where
75    Q: QueryCapability,
76    Q::Ok: fmt::Debug,
77    Q::Err: fmt::Debug,
78{
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        match self {
81            Self::Pending => f.write_str("Pending"),
82            Self::Loading { res } => write!(f, "Loading {{ {res:?} }}"),
83            Self::Settled { res, .. } => write!(f, "Settled {{ {res:?} }}"),
84        }
85    }
86}
87
88impl<Q: QueryCapability> QueryStateData<Q> {
89    /// Check if the state is [QueryStateData::Settled] and [Result::Ok].
90    pub fn is_ok(&self) -> bool {
91        matches!(self, QueryStateData::Settled { res: Ok(_), .. })
92    }
93
94    /// Check if the state is [QueryStateData::Settled] and [Result::Err].
95    pub fn is_err(&self) -> bool {
96        matches!(self, QueryStateData::Settled { res: Err(_), .. })
97    }
98
99    /// Check if the state is [QueryStateData::Loading].
100    pub fn is_loading(&self) -> bool {
101        matches!(self, QueryStateData::Loading { .. })
102    }
103
104    /// Check if the state is [QueryStateData::Pending].
105    pub fn is_pending(&self) -> bool {
106        matches!(self, QueryStateData::Pending)
107    }
108
109    /// Check if the state is stale or not, where stale means outdated.
110    pub fn is_stale(&self, query: &Query<Q>) -> bool {
111        match self {
112            QueryStateData::Pending => true,
113            QueryStateData::Loading { .. } => true,
114            QueryStateData::Settled {
115                settlement_instant, ..
116            } => Instant::now().duration_since(*settlement_instant) >= query.stale_time,
117        }
118    }
119
120    /// Get the value as an [Option].
121    pub fn ok(&self) -> Option<&Q::Ok> {
122        match self {
123            Self::Settled { res: Ok(res), .. } => Some(res),
124            Self::Loading { res: Some(Ok(res)) } => Some(res),
125            _ => None,
126        }
127    }
128
129    /// Get the value as an [Result] if possible, otherwise it will panic.
130    pub fn unwrap(&self) -> &Result<Q::Ok, Q::Err> {
131        match self {
132            Self::Loading { res: Some(v) } => v,
133            Self::Settled { res, .. } => res,
134            _ => unreachable!(),
135        }
136    }
137
138    fn into_loading(self) -> QueryStateData<Q> {
139        match self {
140            QueryStateData::Pending => QueryStateData::Loading { res: None },
141            QueryStateData::Loading { res } => QueryStateData::Loading { res },
142            QueryStateData::Settled { res, .. } => QueryStateData::Loading { res: Some(res) },
143        }
144    }
145}
146pub struct QueriesStorage<Q: QueryCapability> {
147    storage: CopyValue<HashMap<Query<Q>, QueryData<Q>>>,
148}
149
150impl<Q: QueryCapability> Copy for QueriesStorage<Q> {}
151
152impl<Q: QueryCapability> Clone for QueriesStorage<Q> {
153    fn clone(&self) -> Self {
154        *self
155    }
156}
157
158struct QuerySuspenseData {
159    notifier: Arc<Notify>,
160    task: Task,
161}
162
163pub struct QueryData<Q: QueryCapability> {
164    state: Rc<RefCell<QueryStateData<Q>>>,
165    reactive_contexts: Arc<Mutex<HashSet<ReactiveContext>>>,
166
167    suspense_task: Rc<RefCell<Option<QuerySuspenseData>>>,
168    interval_task: Rc<RefCell<Option<(Duration, Task)>>>,
169    clean_task: Rc<RefCell<Option<Task>>>,
170}
171
172impl<Q: QueryCapability> Clone for QueryData<Q> {
173    fn clone(&self) -> Self {
174        Self {
175            state: self.state.clone(),
176            reactive_contexts: self.reactive_contexts.clone(),
177
178            suspense_task: self.suspense_task.clone(),
179            interval_task: self.interval_task.clone(),
180            clean_task: self.clean_task.clone(),
181        }
182    }
183}
184
185impl<Q: QueryCapability> QueriesStorage<Q> {
186    fn new_in_root() -> Self {
187        Self {
188            storage: CopyValue::new_in_scope(HashMap::default(), ScopeId::ROOT),
189        }
190    }
191
192    fn insert_or_get_query(&mut self, query: Query<Q>) -> QueryData<Q> {
193        let query_clone = query.clone();
194        let mut storage = self.storage.write();
195
196        let query_data = storage.entry(query).or_insert_with(|| QueryData {
197            state: Rc::new(RefCell::new(QueryStateData::Pending)),
198            reactive_contexts: Arc::default(),
199            suspense_task: Rc::default(),
200            interval_task: Rc::default(),
201            clean_task: Rc::default(),
202        });
203        let query_data_clone = query_data.clone();
204
205        // Cancel clean task
206        if let Some(clean_task) = query_data.clean_task.take() {
207            clean_task.cancel();
208        }
209
210        // Start an interval task if necessary
211        // If multiple queries subscribers use different intervals the interval task
212        // will run using the shortest interval
213        let interval = query_clone.interval_time;
214        let interval_enabled = query_clone.interval_time != Duration::MAX;
215        let interval_task = &mut *query_data.interval_task.borrow_mut();
216
217        let create_interval_task = match interval_task {
218            None if interval_enabled => true,
219            Some((current_interval, current_interval_task)) if interval_enabled => {
220                let new_interval_is_shorter = *current_interval > interval;
221                if new_interval_is_shorter {
222                    current_interval_task.cancel();
223                    *interval_task = None;
224                }
225                new_interval_is_shorter
226            }
227            _ => false,
228        };
229        if create_interval_task {
230            let task = spawn_forever(async move {
231                loop {
232                    // Wait as long as the stale time is configured
233                    time::sleep(interval).await;
234
235                    // Run the query
236                    QueriesStorage::<Q>::run_queries(&[(&query_clone, &query_data_clone)]).await;
237                }
238            });
239            *interval_task = Some((interval, task));
240        }
241
242        query_data.clone()
243    }
244
245    fn update_tasks(&mut self, query: Query<Q>) {
246        let mut storage_clone = self.storage;
247        let mut storage = self.storage.write();
248
249        let query_data = storage.get_mut(&query).unwrap();
250
251        // Cancel interval task
252        if let Some((_, interval_task)) = query_data.interval_task.take() {
253            interval_task.cancel();
254        }
255
256        // Spawn clean up task if there no more reactive contexts
257        if query_data.reactive_contexts.lock().unwrap().is_empty() {
258            *query_data.clean_task.borrow_mut() = Some(spawn_forever(async move {
259                // Wait as long as the stale time is configured
260                time::sleep(query.clean_time).await;
261
262                // Finally clear the query
263                let mut storage = storage_clone.write();
264                storage.remove(&query);
265            }));
266        }
267    }
268
269    pub async fn get(get_query: GetQuery<Q>) -> QueryReader<Q> {
270        let query: Query<Q> = get_query.into();
271
272        let mut storage = match try_consume_context::<QueriesStorage<Q>>() {
273            Some(storage) => storage,
274            None => provide_root_context(QueriesStorage::<Q>::new_in_root()),
275        };
276
277        let query_data = storage
278            .storage
279            .write()
280            .entry(query.clone())
281            .or_insert_with(|| QueryData {
282                state: Rc::new(RefCell::new(QueryStateData::Pending)),
283                reactive_contexts: Arc::default(),
284                suspense_task: Rc::default(),
285                interval_task: Rc::default(),
286                clean_task: Rc::default(),
287            })
288            .clone();
289
290        // Run the query if the value is stale
291        if query_data.state.borrow().is_stale(&query) {
292            // Set to Loading
293            let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending)
294                .into_loading();
295            *query_data.state.borrow_mut() = res;
296            for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
297                reactive_context.mark_dirty();
298            }
299
300            // Run
301            let res = query.query.run(&query.keys).await;
302
303            // Set to Settled
304            *query_data.state.borrow_mut() = QueryStateData::Settled {
305                res,
306                settlement_instant: Instant::now(),
307            };
308            for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
309                reactive_context.mark_dirty();
310            }
311
312            // Notify the suspense task if any
313            if let Some(suspense_task) = &*query_data.suspense_task.borrow() {
314                suspense_task.notifier.notify_waiters();
315            };
316        }
317
318        // Spawn clean up task if there no more reactive contexts
319        if query_data.reactive_contexts.lock().unwrap().is_empty() {
320            *query_data.clean_task.borrow_mut() = Some(spawn_forever(async move {
321                // Wait as long as the stale time is configured
322                time::sleep(query.clean_time).await;
323
324                // Finally clear the query
325                let mut storage = storage.storage.write();
326                storage.remove(&query);
327            }));
328        }
329
330        QueryReader {
331            state: query_data.state,
332        }
333    }
334
335    pub async fn invalidate_all() {
336        let storage = consume_context::<QueriesStorage<Q>>();
337
338        // Get all the queries
339        let matching_queries = storage
340            .storage
341            .read()
342            .clone()
343            .into_iter()
344            .collect::<Vec<_>>();
345        let matching_queries = matching_queries
346            .iter()
347            .map(|(q, d)| (q, d))
348            .collect::<Vec<_>>();
349
350        // Invalidate the queries
351        Self::run_queries(&matching_queries).await
352    }
353
354    pub async fn invalidate_matching(matching_keys: Q::Keys) {
355        let storage = consume_context::<QueriesStorage<Q>>();
356
357        // Get those queries that match
358        let mut matching_queries = Vec::new();
359        for (query, data) in storage.storage.read().iter() {
360            if query.query.matches(&matching_keys) {
361                matching_queries.push((query.clone(), data.clone()));
362            }
363        }
364        let matching_queries = matching_queries
365            .iter()
366            .map(|(q, d)| (q, d))
367            .collect::<Vec<_>>();
368
369        // Invalidate the queries
370        Self::run_queries(&matching_queries).await
371    }
372
373    async fn run_queries(queries: &[(&Query<Q>, &QueryData<Q>)]) {
374        let tasks = FuturesUnordered::new();
375
376        for (query, query_data) in queries {
377            // Set to Loading
378            let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending)
379                .into_loading();
380            *query_data.state.borrow_mut() = res;
381            for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
382                reactive_context.mark_dirty();
383            }
384
385            tasks.push(Box::pin(async move {
386                // Run
387                let res = query.query.run(&query.keys).await;
388
389                // Set to settled
390                *query_data.state.borrow_mut() = QueryStateData::Settled {
391                    res,
392                    settlement_instant: Instant::now(),
393                };
394                for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
395                    reactive_context.mark_dirty();
396                }
397
398                // Notify the suspense task if any
399                if let Some(suspense_task) = &*query_data.suspense_task.borrow() {
400                    suspense_task.notifier.notify_waiters();
401                };
402            }));
403        }
404
405        tasks.count().await;
406    }
407}
408
409pub struct GetQuery<Q: QueryCapability> {
410    query: Q,
411    keys: Q::Keys,
412
413    stale_time: Duration,
414    clean_time: Duration,
415}
416
417impl<Q: QueryCapability> GetQuery<Q> {
418    pub fn new(keys: Q::Keys, query: Q) -> Self {
419        Self {
420            query,
421            keys,
422            stale_time: Duration::ZERO,
423            clean_time: Duration::ZERO,
424        }
425    }
426    /// For how long is the data considered stale. If a query subscriber is mounted and the data is stale, it will re run the query.
427    ///
428    /// Defaults to [Duration::ZERO], meaning it is marked stale immediately.
429    pub fn stale_time(self, stale_time: Duration) -> Self {
430        Self { stale_time, ..self }
431    }
432
433    /// For how long the data is kept cached after there are no more query subscribers.
434    ///
435    /// Defaults to [Duration::ZERO], meaning it clears automatically.
436    pub fn clean_time(self, clean_time: Duration) -> Self {
437        Self { clean_time, ..self }
438    }
439}
440
441impl<Q: QueryCapability> From<GetQuery<Q>> for Query<Q> {
442    fn from(value: GetQuery<Q>) -> Self {
443        Query {
444            query: value.query,
445            keys: value.keys,
446
447            enabled: true,
448
449            stale_time: value.stale_time,
450            clean_time: value.clean_time,
451            interval_time: Duration::MAX,
452        }
453    }
454}
455#[derive(PartialEq, Clone)]
456pub struct Query<Q: QueryCapability> {
457    query: Q,
458    keys: Q::Keys,
459
460    enabled: bool,
461
462    stale_time: Duration,
463    clean_time: Duration,
464    interval_time: Duration,
465}
466
467impl<Q: QueryCapability> Eq for Query<Q> {}
468impl<Q: QueryCapability> Hash for Query<Q> {
469    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
470        self.query.hash(state);
471        self.keys.hash(state);
472
473        self.enabled.hash(state);
474
475        self.stale_time.hash(state);
476        self.clean_time.hash(state);
477
478        // Intentionally left out as intervals can vary from one query subscriber to another
479        // self.interval_time.hash(state);
480    }
481}
482
483impl<Q: QueryCapability> Query<Q> {
484    pub fn new(keys: Q::Keys, query: Q) -> Self {
485        Self {
486            query,
487            keys,
488            enabled: true,
489            stale_time: Duration::ZERO,
490            clean_time: Duration::from_secs(5 * 60),
491            interval_time: Duration::MAX,
492        }
493    }
494
495    /// Enable or disable this query so that it doesnt automatically run.
496    ///
497    /// Defaults to `true`.
498    pub fn enable(self, enabled: bool) -> Self {
499        Self { enabled, ..self }
500    }
501
502    /// For how long is the data considered stale. If a query subscriber is mounted and the data is stale, it will re run the query
503    /// otherwise it return the cached data.
504    ///
505    /// Defaults to [Duration::ZERO], meaning it is marked stale immediately after it has been used.
506    pub fn stale_time(self, stale_time: Duration) -> Self {
507        Self { stale_time, ..self }
508    }
509
510    /// For how long the data is kept cached after there are no more query subscribers.
511    ///
512    /// Defaults to `5min`, meaning it clears automatically after 5 minutes of no subscribers to it.
513    pub fn clean_time(self, clean_time: Duration) -> Self {
514        Self { clean_time, ..self }
515    }
516
517    /// Every how often the query reruns.
518    ///
519    /// Defaults to [Duration::MAX], meaning it never re runs automatically.
520    ///
521    /// **Note**: If multiple subscribers of the same query use different intervals, only the shortest one will be used.
522    pub fn interval_time(self, interval_time: Duration) -> Self {
523        Self {
524            interval_time,
525            ..self
526        }
527    }
528}
529
530pub struct QueryReader<Q: QueryCapability> {
531    state: Rc<RefCell<QueryStateData<Q>>>,
532}
533
534impl<Q: QueryCapability> QueryReader<Q> {
535    pub fn state(&self) -> Ref<QueryStateData<Q>> {
536        self.state.borrow()
537    }
538
539    /// Get the result of the query.
540    ///
541    /// **This method will panic if the query is not settled.**
542    pub fn as_settled(&self) -> Ref<Result<Q::Ok, Q::Err>> {
543        Ref::map(self.state.borrow(), |state| match state {
544            QueryStateData::Settled { res, .. } => res,
545            _ => panic!("Query is not settled."),
546        })
547    }
548}
549
550pub struct UseQuery<Q: QueryCapability> {
551    query: Memo<Query<Q>>,
552}
553
554impl<Q: QueryCapability> Clone for UseQuery<Q> {
555    fn clone(&self) -> Self {
556        *self
557    }
558}
559
560impl<Q: QueryCapability> Copy for UseQuery<Q> {}
561
562impl<Q: QueryCapability> UseQuery<Q> {
563    /// Read the [Query] state.
564    ///
565    /// This **will** automatically subscribe.
566    /// If you want a **non-subscribing** method have a look at [UseQuery::peek].
567    pub fn read(&self) -> QueryReader<Q> {
568        let storage = consume_context::<QueriesStorage<Q>>();
569        let query_data = storage
570            .storage
571            .peek_unchecked()
572            .get(&self.query.peek())
573            .cloned()
574            .unwrap();
575
576        // Subscribe if possible
577        if let Some(reactive_context) = ReactiveContext::current() {
578            reactive_context.subscribe(query_data.reactive_contexts);
579        }
580
581        QueryReader {
582            state: query_data.state,
583        }
584    }
585
586    /// Read the [Query] state.
587    ///
588    /// This **will not** automatically subscribe.
589    /// If you want a **subscribing** method have a look at [UseQuery::read].
590    pub fn peek(&self) -> QueryReader<Q> {
591        let storage = consume_context::<QueriesStorage<Q>>();
592        let query_data = storage
593            .storage
594            .peek_unchecked()
595            .get(&self.query.peek())
596            .cloned()
597            .unwrap();
598
599        QueryReader {
600            state: query_data.state,
601        }
602    }
603
604    /// Suspend this query until it has been **settled**.
605    ///
606    /// This **will** automatically subscribe.
607    pub fn suspend(&self) -> Result<Result<Q::Ok, Q::Err>, RenderError>
608    where
609        Q::Ok: Clone,
610        Q::Err: Clone,
611    {
612        let storage = consume_context::<QueriesStorage<Q>>();
613        let mut storage = storage.storage.write_unchecked();
614        let query_data = storage.get_mut(&self.query.peek()).unwrap();
615
616        // Subscribe if possible
617        if let Some(reactive_context) = ReactiveContext::current() {
618            reactive_context.subscribe(query_data.reactive_contexts.clone());
619        }
620
621        let state = &*query_data.state.borrow();
622        match state {
623            QueryStateData::Pending | QueryStateData::Loading { res: None } => {
624                let suspense_task_clone = query_data.suspense_task.clone();
625                let mut suspense_task = query_data.suspense_task.borrow_mut();
626                let QuerySuspenseData { task, .. } = suspense_task.get_or_insert_with(|| {
627                    let notifier = Arc::new(Notify::new());
628                    let task = spawn({
629                        let notifier = notifier.clone();
630                        async move {
631                            notifier.notified().await;
632                            let _ = suspense_task_clone.borrow_mut().take();
633                        }
634                    });
635                    QuerySuspenseData { notifier, task }
636                });
637                Err(RenderError::Suspended(SuspendedFuture::new(*task)))
638            }
639            QueryStateData::Settled { res, .. } | QueryStateData::Loading { res: Some(res) } => {
640                Ok(res.clone())
641            }
642        }
643    }
644
645    /// Invalidate this query and await its result.
646    ///
647    /// For a `sync` version use [UseQuery::invalidate].
648    pub async fn invalidate_async(&self) -> QueryReader<Q> {
649        let storage = consume_context::<QueriesStorage<Q>>();
650
651        let query = self.query.peek().clone();
652        let query_data = storage
653            .storage
654            .peek_unchecked()
655            .get(&query)
656            .cloned()
657            .unwrap();
658
659        // Run the query
660        QueriesStorage::run_queries(&[(&query, &query_data)]).await;
661
662        QueryReader {
663            state: query_data.state.clone(),
664        }
665    }
666
667    /// Invalidate this query in the background.
668    ///
669    /// For an `async` version use [UseQuery::invalidate_async].
670    pub fn invalidate(&self) {
671        let storage = consume_context::<QueriesStorage<Q>>();
672
673        let query = self.query.peek().clone();
674        let query_data = storage
675            .storage
676            .peek_unchecked()
677            .get(&query)
678            .cloned()
679            .unwrap();
680
681        // Run the query
682        spawn(async move { QueriesStorage::run_queries(&[(&query, &query_data)]).await });
683    }
684}
685
686/// Queries are used to get data asynchronously (e.g external resources such as HTTP APIs), which can later be cached or refreshed.
687///
688/// Important concepts:
689///
690/// ### Stale time
691/// This is how long will a value that is cached, considered to be recent enough.
692/// So in other words, if a value is stale it means that its outdated and therefore it should be refreshed.
693///
694/// By default the stale time is `0ms`, so if a value is cached and a new query subscriber
695/// is interested in this value, it will get refreshed automatically.
696///
697/// See [Query::stale_time].
698///
699/// ### Clean time
700/// This is how long will a value kept cached after there are no more subscribers of that query.
701///
702/// Imagine there is `Subscriber 1` of a query, the data is requested and cached.
703/// But after some seconds the `Subscriber 1` is unmounted, but the data is not cleared as the default clean time is `5min`.
704/// A few seconds later the `Subscriber 1` gets mounted again, it requests the data again but this time it is returned directly from the cache.
705///
706/// See [Query::clean_time].
707///
708/// ### Interval time
709/// This is how often do you want a query to be refreshed in the background automatically.
710/// By default it never refreshes automatically.
711///
712/// See [Query::interval_time].
713pub fn use_query<Q: QueryCapability>(query: Query<Q>) -> UseQuery<Q> {
714    let mut storage = match try_consume_context::<QueriesStorage<Q>>() {
715        Some(storage) => storage,
716        None => provide_root_context(QueriesStorage::<Q>::new_in_root()),
717    };
718
719    let current_query = use_hook(|| Rc::new(RefCell::new(None)));
720
721    let query = use_memo(use_reactive!(|query| {
722        let query_data = storage.insert_or_get_query(query.clone());
723
724        // Update the query tasks if there has been a change in the query
725        if let Some(prev_query) = current_query.borrow_mut().take() {
726            storage.update_tasks(prev_query);
727        }
728
729        // Store this new query
730        current_query.borrow_mut().replace(query.clone());
731
732        // Immediately run the query if enabled and the value is stale
733        if query.enabled && query_data.state.borrow().is_stale(&query) {
734            let query = query.clone();
735            spawn(async move {
736                QueriesStorage::run_queries(&[(&query, &query_data)]).await;
737            });
738        }
739
740        query
741    }));
742
743    // Update the query tasks when the scope is dropped
744    use_drop({
745        move || {
746            storage.update_tasks(query.peek().clone());
747        }
748    });
749
750    UseQuery { query }
751}