dioxus_query/
query.rs

1use core::fmt;
2use std::{
3    cell::{Ref, RefCell},
4    collections::{HashMap, HashSet},
5    future::Future,
6    hash::Hash,
7    mem,
8    rc::Rc,
9    sync::{Arc, Mutex},
10    time::Duration,
11};
12
13use ::warnings::Warning;
14use dioxus_lib::prelude::Task;
15use dioxus_lib::prelude::*;
16use dioxus_lib::signals::{Readable, Writable};
17use dioxus_lib::{
18    hooks::{use_memo, use_reactive},
19    signals::CopyValue,
20};
21use futures_util::stream::{FuturesUnordered, StreamExt};
22use tokio::{sync::Notify, time::Instant};
23
24pub trait QueryCapability
25where
26    Self: 'static + Clone + PartialEq + Hash + Eq,
27{
28    type Ok;
29    type Err;
30    type Keys: Hash + PartialEq + Clone;
31
32    /// Query logic.
33    fn run(&self, keys: &Self::Keys) -> impl Future<Output = Result<Self::Ok, Self::Err>>;
34
35    /// Implement a custom logic to check if this query should be invalidated or not given a [QueryCapability::Keys].
36    fn matches(&self, _keys: &Self::Keys) -> bool {
37        true
38    }
39}
40
41pub enum QueryStateData<Q: QueryCapability> {
42    /// Has not loaded yet.
43    Pending,
44    /// Is loading and may not have a previous settled value.
45    Loading { res: Option<Result<Q::Ok, Q::Err>> },
46    /// Is not loading and has a settled value.
47    Settled {
48        res: Result<Q::Ok, Q::Err>,
49        settlement_instant: Instant,
50    },
51}
52
53impl<Q: QueryCapability> TryFrom<QueryStateData<Q>> for Result<Q::Ok, Q::Err> {
54    type Error = ();
55
56    fn try_from(value: QueryStateData<Q>) -> Result<Self, Self::Error> {
57        match value {
58            QueryStateData::Loading { res: Some(res) } => Ok(res),
59            QueryStateData::Settled { res, .. } => Ok(res),
60            _ => Err(()),
61        }
62    }
63}
64
65impl<Q> fmt::Debug for QueryStateData<Q>
66where
67    Q: QueryCapability,
68    Q::Ok: fmt::Debug,
69    Q::Err: fmt::Debug,
70{
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        match self {
73            Self::Pending => f.write_str("Pending"),
74            Self::Loading { res } => write!(f, "Loading {{ {res:?} }}"),
75            Self::Settled { res, .. } => write!(f, "Settled {{ {res:?} }}"),
76        }
77    }
78}
79
80impl<Q: QueryCapability> QueryStateData<Q> {
81    /// Check if the state is [QueryStateData::Settled] and [Result::Ok].
82    pub fn is_ok(&self) -> bool {
83        matches!(self, QueryStateData::Settled { res: Ok(_), .. })
84    }
85
86    /// Check if the state is [QueryStateData::Settled] and [Result::Err].
87    pub fn is_err(&self) -> bool {
88        matches!(self, QueryStateData::Settled { res: Err(_), .. })
89    }
90
91    /// Check if the state is [QueryStateData::Loading].
92    pub fn is_loading(&self) -> bool {
93        matches!(self, QueryStateData::Loading { .. })
94    }
95
96    /// Check if the state is [QueryStateData::Pending].
97    pub fn is_pending(&self) -> bool {
98        matches!(self, QueryStateData::Pending)
99    }
100
101    /// Check if the state is stale or not, where stale means outdated.
102    pub fn is_stale(&self, query: &Query<Q>) -> bool {
103        match self {
104            QueryStateData::Pending => true,
105            QueryStateData::Loading { .. } => true,
106            QueryStateData::Settled {
107                settlement_instant, ..
108            } => Instant::now().duration_since(*settlement_instant) >= query.stale_time,
109        }
110    }
111
112    /// Get the value as an [Option].
113    pub fn ok(&self) -> Option<&Q::Ok> {
114        match self {
115            Self::Settled { res: Ok(res), .. } => Some(res),
116            Self::Loading { res: Some(Ok(res)) } => Some(res),
117            _ => None,
118        }
119    }
120
121    /// Get the value as an [Result] if possible, otherwise it will panic.
122    pub fn unwrap(&self) -> &Result<Q::Ok, Q::Err> {
123        match self {
124            Self::Loading { res: Some(v) } => v,
125            Self::Settled { res, .. } => res,
126            _ => unreachable!(),
127        }
128    }
129
130    fn into_loading(self) -> QueryStateData<Q> {
131        match self {
132            QueryStateData::Pending => QueryStateData::Loading { res: None },
133            QueryStateData::Loading { res } => QueryStateData::Loading { res },
134            QueryStateData::Settled { res, .. } => QueryStateData::Loading { res: Some(res) },
135        }
136    }
137}
138pub struct QueriesStorage<Q: QueryCapability> {
139    storage: CopyValue<HashMap<Query<Q>, QueryData<Q>>>,
140}
141
142impl<Q: QueryCapability> Copy for QueriesStorage<Q> {}
143
144impl<Q: QueryCapability> Clone for QueriesStorage<Q> {
145    fn clone(&self) -> Self {
146        *self
147    }
148}
149
150struct QuerySuspenseData {
151    notifier: Arc<Notify>,
152    task: Task,
153}
154
155pub struct QueryData<Q: QueryCapability> {
156    state: Rc<RefCell<QueryStateData<Q>>>,
157    reactive_contexts: Arc<Mutex<HashSet<ReactiveContext>>>,
158
159    suspense_task: Rc<RefCell<Option<QuerySuspenseData>>>,
160    interval_task: Rc<RefCell<Option<(Duration, Task)>>>,
161    clean_task: Rc<RefCell<Option<Task>>>,
162}
163
164impl<Q: QueryCapability> Clone for QueryData<Q> {
165    fn clone(&self) -> Self {
166        Self {
167            state: self.state.clone(),
168            reactive_contexts: self.reactive_contexts.clone(),
169
170            suspense_task: self.suspense_task.clone(),
171            interval_task: self.interval_task.clone(),
172            clean_task: self.clean_task.clone(),
173        }
174    }
175}
176
177impl<Q: QueryCapability> QueriesStorage<Q> {
178    fn new_in_root() -> Self {
179        Self {
180            storage: CopyValue::new_in_scope(HashMap::default(), ScopeId::ROOT),
181        }
182    }
183
184    fn insert_or_get_query(&mut self, query: Query<Q>) -> QueryData<Q> {
185        let query_clone = query.clone();
186        let mut storage = self.storage.write();
187
188        let query_data = storage.entry(query).or_insert_with(|| QueryData {
189            state: Rc::new(RefCell::new(QueryStateData::Pending)),
190            reactive_contexts: Arc::default(),
191            suspense_task: Rc::default(),
192            interval_task: Rc::default(),
193            clean_task: Rc::default(),
194        });
195        let query_data_clone = query_data.clone();
196
197        // Cancel clean task
198        if let Some(clean_task) = query_data.clean_task.take() {
199            clean_task.cancel();
200        }
201
202        // Start an interval task if necessary
203        // If multiple queries subscribers use different intervals the interval task
204        // will run using the shortest interval
205        let interval = query_clone.interval_time;
206        let interval_enabled = query_clone.interval_time != Duration::MAX;
207        let interval_task = &mut *query_data.interval_task.borrow_mut();
208
209        let create_interval_task = match interval_task {
210            None if interval_enabled => true,
211            Some((current_interval, current_interval_task)) if interval_enabled => {
212                let new_interval_is_shorter = *current_interval > interval;
213                if new_interval_is_shorter {
214                    current_interval_task.cancel();
215                    *interval_task = None;
216                }
217                new_interval_is_shorter
218            }
219            _ => false,
220        };
221        if create_interval_task {
222            let task = spawn_forever(async move {
223                loop {
224                    // Wait as long as the stale time is configured
225                    tokio::time::sleep(interval).await;
226
227                    // Run the query
228                    QueriesStorage::<Q>::run_queries(&[(&query_clone, &query_data_clone)]).await;
229                }
230            })
231            .expect("Failed to spawn interval task.");
232            *interval_task = Some((interval, task));
233        }
234
235        query_data.clone()
236    }
237
238    fn update_tasks(&mut self, query: Query<Q>) {
239        let mut storage_clone = self.storage;
240        let mut storage = self.storage.write();
241
242        let query_data = storage.get_mut(&query).unwrap();
243
244        // Cancel interval task
245        if let Some((_, interval_task)) = query_data.interval_task.take() {
246            interval_task.cancel();
247        }
248
249        // Spawn clean up task if there no more reactive contexts
250        if query_data.reactive_contexts.lock().unwrap().is_empty() {
251            *query_data.clean_task.borrow_mut() = spawn_forever(async move {
252                // Wait as long as the stale time is configured
253                tokio::time::sleep(query.clean_time).await;
254
255                // Finally clear the query
256                let mut storage = storage_clone.write();
257                storage.remove(&query);
258            });
259        }
260    }
261
262    pub async fn get(get_query: GetQuery<Q>) -> QueryReader<Q> {
263        let query: Query<Q> = get_query.into();
264
265        let mut storage = match try_consume_context::<QueriesStorage<Q>>() {
266            Some(storage) => storage,
267            None => provide_root_context(QueriesStorage::<Q>::new_in_root()),
268        };
269
270        let query_data = storage
271            .storage
272            .write()
273            .entry(query.clone())
274            .or_insert_with(|| QueryData {
275                state: Rc::new(RefCell::new(QueryStateData::Pending)),
276                reactive_contexts: Arc::default(),
277                suspense_task: Rc::default(),
278                interval_task: Rc::default(),
279                clean_task: Rc::default(),
280            })
281            .clone();
282
283        // Run the query if the value is stale
284        if query_data.state.borrow().is_stale(&query) {
285            // Set to Loading
286            let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending)
287                .into_loading();
288            *query_data.state.borrow_mut() = res;
289            for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
290                reactive_context.mark_dirty();
291            }
292
293            // Run
294            let res = query.query.run(&query.keys).await;
295
296            // Set to Settled
297            *query_data.state.borrow_mut() = QueryStateData::Settled {
298                res,
299                settlement_instant: Instant::now(),
300            };
301            for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
302                reactive_context.mark_dirty();
303            }
304        }
305
306        // Notify the suspense task if any
307        if let Some(suspense_task) = &*query_data.suspense_task.borrow() {
308            suspense_task.notifier.notify_waiters();
309        };
310
311        // Spawn clean up task if there no more reactive contexts
312        if query_data.reactive_contexts.lock().unwrap().is_empty() {
313            *query_data.clean_task.borrow_mut() = spawn_forever(async move {
314                // Wait as long as the stale time is configured
315                tokio::time::sleep(query.clean_time).await;
316
317                // Finally clear the query
318                let mut storage = storage.storage.write();
319                storage.remove(&query);
320            });
321        }
322
323        QueryReader {
324            state: query_data.state,
325        }
326    }
327
328    pub async fn invalidate_all() {
329        let storage = consume_context::<QueriesStorage<Q>>();
330
331        // Get all the queries
332        let matching_queries = storage
333            .storage
334            .read()
335            .clone()
336            .into_iter()
337            .collect::<Vec<_>>();
338        let matching_queries = matching_queries
339            .iter()
340            .map(|(q, d)| (q, d))
341            .collect::<Vec<_>>();
342
343        // Invalidate the queries
344        Self::run_queries(&matching_queries).await
345    }
346
347    pub async fn invalidate_matching(matching_keys: Q::Keys) {
348        let storage = consume_context::<QueriesStorage<Q>>();
349
350        // Get those queries that match
351        let mut matching_queries = Vec::new();
352        for (query, data) in storage.storage.read().iter() {
353            if query.query.matches(&matching_keys) {
354                matching_queries.push((query.clone(), data.clone()));
355            }
356        }
357        let matching_queries = matching_queries
358            .iter()
359            .map(|(q, d)| (q, d))
360            .collect::<Vec<_>>();
361
362        // Invalidate the queries
363        Self::run_queries(&matching_queries).await
364    }
365
366    async fn run_queries(queries: &[(&Query<Q>, &QueryData<Q>)]) {
367        let tasks = FuturesUnordered::new();
368
369        for (query, query_data) in queries {
370            // Set to Loading
371            let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending)
372                .into_loading();
373            *query_data.state.borrow_mut() = res;
374            for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
375                reactive_context.mark_dirty();
376            }
377
378            tasks.push(Box::pin(async move {
379                // Run
380                let res = query.query.run(&query.keys).await;
381
382                // Set to settled
383                *query_data.state.borrow_mut() = QueryStateData::Settled {
384                    res,
385                    settlement_instant: Instant::now(),
386                };
387                for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
388                    reactive_context.mark_dirty();
389                }
390            }));
391        }
392
393        tasks.count().await;
394    }
395}
396
397pub struct GetQuery<Q: QueryCapability> {
398    query: Q,
399    keys: Q::Keys,
400
401    stale_time: Duration,
402    clean_time: Duration,
403}
404
405impl<Q: QueryCapability> GetQuery<Q> {
406    pub fn new(keys: Q::Keys, query: Q) -> Self {
407        Self {
408            query,
409            keys,
410            stale_time: Duration::ZERO,
411            clean_time: Duration::ZERO,
412        }
413    }
414    /// For how long is the data considered stale. If a query subscriber is mounted and the data is stale, it will re run the query.
415    ///
416    /// Defaults to [Duration::ZERO], meaning it is marked stale immediately.
417    pub fn stale_time(self, stale_time: Duration) -> Self {
418        Self { stale_time, ..self }
419    }
420
421    /// For how long the data is kept cached after there are no more query subscribers.
422    ///
423    /// Defaults to [Duration::ZERO], meaning it clears automatically.
424    pub fn clean_time(self, clean_time: Duration) -> Self {
425        Self { clean_time, ..self }
426    }
427}
428
429impl<Q: QueryCapability> From<GetQuery<Q>> for Query<Q> {
430    fn from(value: GetQuery<Q>) -> Self {
431        Query {
432            query: value.query,
433            keys: value.keys,
434
435            enabled: true,
436
437            stale_time: value.stale_time,
438            clean_time: value.clean_time,
439            interval_time: Duration::MAX,
440        }
441    }
442}
443#[derive(PartialEq, Clone)]
444pub struct Query<Q: QueryCapability> {
445    query: Q,
446    keys: Q::Keys,
447
448    enabled: bool,
449
450    stale_time: Duration,
451    clean_time: Duration,
452    interval_time: Duration,
453}
454
455impl<Q: QueryCapability> Eq for Query<Q> {}
456impl<Q: QueryCapability> Hash for Query<Q> {
457    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
458        self.query.hash(state);
459        self.keys.hash(state);
460
461        self.enabled.hash(state);
462
463        self.stale_time.hash(state);
464        self.clean_time.hash(state);
465
466        // Intentionally left out as intervals can vary from one query subscriber to another
467        // self.interval_time.hash(state);
468    }
469}
470
471impl<Q: QueryCapability> Query<Q> {
472    pub fn new(keys: Q::Keys, query: Q) -> Self {
473        Self {
474            query,
475            keys,
476            enabled: true,
477            stale_time: Duration::ZERO,
478            clean_time: Duration::from_secs(5 * 60),
479            interval_time: Duration::MAX,
480        }
481    }
482
483    /// Enable or disable this query so that it doesnt automatically run.
484    ///
485    /// Defaults to `true`.
486    pub fn enable(self, enabled: bool) -> Self {
487        Self { enabled, ..self }
488    }
489
490    /// For how long is the data considered stale. If a query subscriber is mounted and the data is stale, it will re run the query
491    /// otherwise it return the cached data.
492    ///
493    /// Defaults to [Duration::ZERO], meaning it is marked stale immediately after it has been used.
494    pub fn stale_time(self, stale_time: Duration) -> Self {
495        Self { stale_time, ..self }
496    }
497
498    /// For how long the data is kept cached after there are no more query subscribers.
499    ///
500    /// Defaults to `5min`, meaning it clears automatically after 5 minutes of no subscribers to it.
501    pub fn clean_time(self, clean_time: Duration) -> Self {
502        Self { clean_time, ..self }
503    }
504
505    /// Every how often the query reruns.
506    ///
507    /// Defaults to [Duration::MAX], meaning it never re runs automatically.
508    ///
509    /// **Note**: If multiple subscribers of the same query use different intervals, only the shortest one will be used.
510    pub fn interval_time(self, interval_time: Duration) -> Self {
511        Self {
512            interval_time,
513            ..self
514        }
515    }
516}
517
518pub struct QueryReader<Q: QueryCapability> {
519    state: Rc<RefCell<QueryStateData<Q>>>,
520}
521
522impl<Q: QueryCapability> QueryReader<Q> {
523    pub fn state(&self) -> Ref<QueryStateData<Q>> {
524        self.state.borrow()
525    }
526
527    /// Get the result of the query.
528    ///
529    /// **This method will panic if the query is not settled.**
530    pub fn as_settled(&self) -> Ref<Result<Q::Ok, Q::Err>> {
531        Ref::map(self.state.borrow(), |state| match state {
532            QueryStateData::Settled { res, .. } => res,
533            _ => panic!("Query is not settled."),
534        })
535    }
536}
537
538pub struct UseQuery<Q: QueryCapability> {
539    query: Memo<Query<Q>>,
540}
541
542impl<Q: QueryCapability> Clone for UseQuery<Q> {
543    fn clone(&self) -> Self {
544        *self
545    }
546}
547
548impl<Q: QueryCapability> Copy for UseQuery<Q> {}
549
550impl<Q: QueryCapability> UseQuery<Q> {
551    /// Read the [Query] state.
552    ///
553    /// This **will** automatically subscribe.
554    /// If you want a **non-subscribing** method have a look at [UseQuery::peek].
555    pub fn read(&self) -> QueryReader<Q> {
556        let storage = consume_context::<QueriesStorage<Q>>();
557        let query_data = storage
558            .storage
559            .peek_unchecked()
560            .get(&self.query.peek())
561            .cloned()
562            .unwrap();
563
564        // Subscribe if possible
565        if let Some(reactive_context) = ReactiveContext::current() {
566            reactive_context.subscribe(query_data.reactive_contexts);
567        }
568
569        QueryReader {
570            state: query_data.state,
571        }
572    }
573
574    /// Read the [Query] state.
575    ///
576    /// This **will not** automatically subscribe.
577    /// If you want a **subscribing** method have a look at [UseQuery::read].
578    pub fn peek(&self) -> QueryReader<Q> {
579        let storage = consume_context::<QueriesStorage<Q>>();
580        let query_data = storage
581            .storage
582            .peek_unchecked()
583            .get(&self.query.peek())
584            .cloned()
585            .unwrap();
586
587        QueryReader {
588            state: query_data.state,
589        }
590    }
591
592    /// Suspend this query until it has been **settled**.
593    ///
594    /// This **will** automatically subscribe.
595    pub fn suspend(&self) -> Result<Result<Q::Ok, Q::Err>, RenderError>
596    where
597        Q::Ok: Clone,
598        Q::Err: Clone,
599    {
600        let _allow_write_in_component_body =
601            ::warnings::Allow::new(warnings::signal_write_in_component_body::ID);
602
603        let storage = consume_context::<QueriesStorage<Q>>();
604        let mut storage = storage.storage.write_unchecked();
605        let query_data = storage.get_mut(&self.query.peek()).unwrap();
606
607        // Subscribe if possible
608        if let Some(reactive_context) = ReactiveContext::current() {
609            reactive_context.subscribe(query_data.reactive_contexts.clone());
610        }
611
612        let state = &*query_data.state.borrow();
613        match state {
614            QueryStateData::Pending | QueryStateData::Loading { res: None } => {
615                let suspense_task_clone = query_data.suspense_task.clone();
616                let mut suspense_task = query_data.suspense_task.borrow_mut();
617                let QuerySuspenseData { task, .. } = suspense_task.get_or_insert_with(|| {
618                    let notifier = Arc::new(Notify::new());
619                    let task = spawn({
620                        let notifier = notifier.clone();
621                        async move {
622                            notifier.notified().await;
623                            let _ = suspense_task_clone.borrow_mut().take();
624                        }
625                    });
626                    QuerySuspenseData { notifier, task }
627                });
628                Err(RenderError::Suspended(SuspendedFuture::new(*task)))
629            }
630            QueryStateData::Settled { res, .. } | QueryStateData::Loading { res: Some(res) } => {
631                Ok(res.clone())
632            }
633        }
634    }
635
636    /// Invalidate this query and await its result.
637    ///
638    /// For a `sync` version use [UseQuery::invalidate].
639    pub async fn invalidate_async(&self) -> QueryReader<Q> {
640        let storage = consume_context::<QueriesStorage<Q>>();
641
642        let query = self.query.peek().clone();
643        let query_data = storage
644            .storage
645            .peek_unchecked()
646            .get(&query)
647            .cloned()
648            .unwrap();
649
650        // Run the query
651        QueriesStorage::run_queries(&[(&query, &query_data)]).await;
652
653        QueryReader {
654            state: query_data.state.clone(),
655        }
656    }
657
658    /// Invalidate this query in the background.
659    ///
660    /// For an `async` version use [UseQuery::invalidate_async].
661    pub fn invalidate(&self) {
662        let storage = consume_context::<QueriesStorage<Q>>();
663
664        let query = self.query.peek().clone();
665        let query_data = storage
666            .storage
667            .peek_unchecked()
668            .get(&query)
669            .cloned()
670            .unwrap();
671
672        // Run the query
673        spawn(async move { QueriesStorage::run_queries(&[(&query, &query_data)]).await });
674    }
675}
676
677/// Queries are used to get data asynchronously (e.g external resources such as HTTP APIs), which can later be cached or refreshed.
678///
679/// Important concepts:
680///
681/// ### Stale time
682/// This is how long will a value that is cached, considered to be recent enough.
683/// So in other words, if a value is stale it means that its outdated and therefore it should be refreshed.
684///
685/// By default the stale time is `0ms`, so if a value is cached and a new query subscriber
686/// is interested in this value, it will get refreshed automatically.
687///
688/// See [Query::stale_time].
689///
690/// ### Clean time
691/// This is how long will a value kept cached after there are no more subscribers of that query.
692///
693/// Imagine there is `Subscriber 1` of a query, the data is requested and cached.
694/// But after some seconds the `Subscriber 1` is unmounted, but the data is not cleared as the default clean time is `5min`.
695/// A few seconds later the `Subscriber 1` gets mounted again, it requests the data again but this time it is returned directly from the cache.
696///
697/// See [Query::clean_time].
698///
699/// ### Interval time
700/// This is how often do you want a query to be refreshed in the background automatically.
701/// By default it never refreshes automatically.
702///
703/// See [Query::interval_time].
704pub fn use_query<Q: QueryCapability>(query: Query<Q>) -> UseQuery<Q> {
705    let mut storage = match try_consume_context::<QueriesStorage<Q>>() {
706        Some(storage) => storage,
707        None => provide_root_context(QueriesStorage::<Q>::new_in_root()),
708    };
709
710    let current_query = use_hook(|| Rc::new(RefCell::new(None)));
711
712    let query = use_memo(use_reactive!(|query| {
713        let query_data = storage.insert_or_get_query(query.clone());
714
715        // Update the query tasks if there has been a change in the query
716        if let Some(prev_query) = current_query.borrow_mut().take() {
717            storage.update_tasks(prev_query);
718        }
719
720        // Store this new query
721        current_query.borrow_mut().replace(query.clone());
722
723        // Immediately run the query if enabled and the value is stale
724        if query.enabled && query_data.state.borrow().is_stale(&query) {
725            let query = query.clone();
726            spawn(async move {
727                QueriesStorage::run_queries(&[(&query, &query_data)]).await;
728            });
729        }
730
731        query
732    }));
733
734    // Update the query tasks when the scope is dropped
735    use_drop({
736        move || {
737            storage.update_tasks(query.peek().clone());
738        }
739    });
740
741    UseQuery { query }
742}