Skip to main content

freya_query/
query.rs

1use core::fmt;
2use std::{
3    cell::{
4        Ref,
5        RefCell,
6    },
7    collections::HashMap,
8    future::Future,
9    hash::Hash,
10    mem,
11    rc::Rc,
12    time::{
13        Duration,
14        Instant,
15    },
16};
17
18use async_io::Timer;
19use freya_core::{
20    integration::FxHashSet,
21    lifecycle::context::{
22        consume_context,
23        provide_context_for_scope_id,
24        try_consume_context,
25    },
26    prelude::*,
27    scope_id::ScopeId,
28};
29use futures_util::stream::{
30    FuturesUnordered,
31    StreamExt,
32};
33
34pub trait QueryCapability
35where
36    Self: 'static + Clone + PartialEq + Hash + Eq,
37{
38    type Ok;
39    type Err;
40    type Keys: Hash + PartialEq + Clone;
41
42    /// Query logic.
43    fn run(&self, keys: &Self::Keys) -> impl Future<Output = Result<Self::Ok, Self::Err>>;
44
45    /// Implement a custom logic to check if this query should be invalidated or not given a [QueryCapability::Keys].
46    fn matches(&self, _keys: &Self::Keys) -> bool {
47        true
48    }
49}
50
51pub enum QueryStateData<Q: QueryCapability> {
52    /// Has not loaded yet.
53    Pending,
54    /// Is loading and may not have a previous settled value.
55    Loading { res: Option<Result<Q::Ok, Q::Err>> },
56    /// Is not loading and has a settled value.
57    Settled {
58        res: Result<Q::Ok, Q::Err>,
59        settlement_instant: Instant,
60    },
61}
62
63impl<Q: QueryCapability> TryFrom<QueryStateData<Q>> for Result<Q::Ok, Q::Err> {
64    type Error = ();
65
66    fn try_from(value: QueryStateData<Q>) -> Result<Self, Self::Error> {
67        match value {
68            QueryStateData::Loading { res: Some(res) } => Ok(res),
69            QueryStateData::Settled { res, .. } => Ok(res),
70            _ => Err(()),
71        }
72    }
73}
74
75impl<Q> fmt::Debug for QueryStateData<Q>
76where
77    Q: QueryCapability,
78    Q::Ok: fmt::Debug,
79    Q::Err: fmt::Debug,
80{
81    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82        match self {
83            Self::Pending => f.write_str("Pending"),
84            Self::Loading { res } => write!(f, "Loading {{ {res:?} }}"),
85            Self::Settled { res, .. } => write!(f, "Settled {{ {res:?} }}"),
86        }
87    }
88}
89
90impl<Q: QueryCapability> QueryStateData<Q> {
91    /// Check if the state is [QueryStateData::Settled] and [Result::Ok].
92    pub fn is_ok(&self) -> bool {
93        matches!(self, QueryStateData::Settled { res: Ok(_), .. })
94    }
95
96    /// Check if the state is [QueryStateData::Settled] and [Result::Err].
97    pub fn is_err(&self) -> bool {
98        matches!(self, QueryStateData::Settled { res: Err(_), .. })
99    }
100
101    /// Check if the state is [QueryStateData::Loading].
102    pub fn is_loading(&self) -> bool {
103        matches!(self, QueryStateData::Loading { .. })
104    }
105
106    /// Check if the state is [QueryStateData::Pending].
107    pub fn is_pending(&self) -> bool {
108        matches!(self, QueryStateData::Pending)
109    }
110
111    /// Check if the state is stale or not, where stale means outdated.
112    pub fn is_stale(&self, query: &Query<Q>) -> bool {
113        match self {
114            QueryStateData::Pending => true,
115            QueryStateData::Loading { .. } => true,
116            QueryStateData::Settled {
117                settlement_instant, ..
118            } => Instant::now().duration_since(*settlement_instant) >= query.stale_time,
119        }
120    }
121
122    /// Get the value as an [Option].
123    pub fn ok(&self) -> Option<&Q::Ok> {
124        match self {
125            Self::Settled { res: Ok(res), .. } => Some(res),
126            Self::Loading { res: Some(Ok(res)) } => Some(res),
127            _ => None,
128        }
129    }
130
131    /// Get the value as an [Result] if possible, otherwise it will panic.
132    pub fn unwrap(&self) -> &Result<Q::Ok, Q::Err> {
133        match self {
134            Self::Loading { res: Some(v) } => v,
135            Self::Settled { res, .. } => res,
136            _ => unreachable!(),
137        }
138    }
139
140    fn into_loading(self) -> QueryStateData<Q> {
141        match self {
142            QueryStateData::Pending => QueryStateData::Loading { res: None },
143            QueryStateData::Loading { res } => QueryStateData::Loading { res },
144            QueryStateData::Settled { res, .. } => QueryStateData::Loading { res: Some(res) },
145        }
146    }
147}
148
149pub struct QueriesStorage<Q: QueryCapability> {
150    storage: State<HashMap<Query<Q>, QueryData<Q>>>,
151}
152
153impl<Q: QueryCapability> Copy for QueriesStorage<Q> {}
154
155impl<Q: QueryCapability> Clone for QueriesStorage<Q> {
156    fn clone(&self) -> Self {
157        *self
158    }
159}
160
161pub struct QueryData<Q: QueryCapability> {
162    state: Rc<RefCell<QueryStateData<Q>>>,
163    reactive_contexts: Rc<RefCell<FxHashSet<ReactiveContext>>>,
164
165    interval_task: Rc<RefCell<Option<(Duration, TaskHandle)>>>,
166    clean_task: Rc<RefCell<Option<TaskHandle>>>,
167}
168
169impl<Q: QueryCapability> Clone for QueryData<Q> {
170    fn clone(&self) -> Self {
171        Self {
172            state: self.state.clone(),
173            reactive_contexts: self.reactive_contexts.clone(),
174
175            interval_task: self.interval_task.clone(),
176            clean_task: self.clean_task.clone(),
177        }
178    }
179}
180
181impl<Q: QueryCapability> QueriesStorage<Q> {
182    fn new_in_root() -> Self {
183        Self {
184            storage: State::create_global(HashMap::default()),
185        }
186    }
187
188    fn insert_or_get_query(&mut self, query: Query<Q>) -> QueryData<Q> {
189        let query_clone = query.clone();
190        let mut storage = self.storage.write_unchecked();
191
192        let query_data = storage.entry(query).or_insert_with(|| QueryData {
193            state: Rc::new(RefCell::new(QueryStateData::Pending)),
194            reactive_contexts: Rc::new(RefCell::new(FxHashSet::default())),
195            interval_task: Rc::default(),
196            clean_task: Rc::default(),
197        });
198        let query_data_clone = query_data.clone();
199
200        // Cancel clean task
201        if let Some(clean_task) = query_data.clean_task.take() {
202            clean_task.cancel();
203        }
204
205        // Start an interval task if necessary
206        // If multiple queries subscribers use different intervals the interval task
207        // will run using the shortest interval
208        let interval = query_clone.interval_time;
209        let interval_enabled = query_clone.interval_time != Duration::MAX;
210        let interval_task = &mut *query_data.interval_task.borrow_mut();
211
212        let create_interval_task = match interval_task {
213            None if interval_enabled => true,
214            Some((current_interval, current_interval_task)) if interval_enabled => {
215                let new_interval_is_shorter = *current_interval > interval;
216                if new_interval_is_shorter {
217                    current_interval_task.cancel();
218                    *interval_task = None;
219                }
220                new_interval_is_shorter
221            }
222            _ => false,
223        };
224        if create_interval_task {
225            let task = spawn_forever(async move {
226                loop {
227                    // Wait as long as the stale time is configured
228                    Timer::after(interval).await;
229
230                    // Run the query
231                    QueriesStorage::<Q>::run_queries(&[(&query_clone, &query_data_clone)]).await;
232                }
233            });
234            *interval_task = Some((interval, task));
235        }
236
237        query_data.clone()
238    }
239
240    fn update_tasks(&mut self, query: Query<Q>) {
241        let storage_clone = self.storage;
242        let mut storage = self.storage.write_unchecked();
243
244        let query_data = storage.get_mut(&query).unwrap();
245
246        // Cancel interval task
247        if let Some((_, interval_task)) = query_data.interval_task.take() {
248            interval_task.cancel();
249        }
250
251        // Spawn clean up task if there no more reactive contexts
252        if query_data.reactive_contexts.borrow().is_empty() {
253            *query_data.clean_task.borrow_mut() = Some(spawn_forever(async move {
254                // Wait as long as the stale time is configured
255                Timer::after(query.clean_time).await;
256
257                // Finally clear the query
258                let mut storage = storage_clone.write_unchecked();
259                storage.remove(&query);
260            }));
261        }
262    }
263
264    pub async fn get(get_query: GetQuery<Q>) -> QueryReader<Q> {
265        let query: Query<Q> = get_query.into();
266
267        let mut storage = match try_consume_context::<QueriesStorage<Q>>() {
268            Some(storage) => storage,
269            None => {
270                provide_context_for_scope_id(
271                    QueriesStorage::<Q>::new_in_root(),
272                    Some(ScopeId::ROOT),
273                );
274                try_consume_context::<QueriesStorage<Q>>().unwrap()
275            }
276        };
277
278        let mut map = storage.storage.write();
279        let query_data = map
280            .entry(query.clone())
281            .or_insert_with(|| QueryData {
282                state: Rc::new(RefCell::new(QueryStateData::Pending)),
283                reactive_contexts: Rc::new(RefCell::new(FxHashSet::default())),
284                interval_task: Rc::default(),
285                clean_task: Rc::default(),
286            })
287            .clone();
288
289        // Run the query if the value is stale
290        if query_data.state.borrow().is_stale(&query) {
291            // Set to Loading
292            let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending)
293                .into_loading();
294            *query_data.state.borrow_mut() = res;
295            for reactive_context in query_data.reactive_contexts.borrow().iter() {
296                reactive_context.notify();
297            }
298
299            // Run
300            let res = query.query.run(&query.keys).await;
301
302            // Set to Settled
303            *query_data.state.borrow_mut() = QueryStateData::Settled {
304                res,
305                settlement_instant: Instant::now(),
306            };
307            for reactive_context in query_data.reactive_contexts.borrow().iter() {
308                reactive_context.notify();
309            }
310        }
311
312        // Spawn clean up task if there no more reactive contexts
313        if query_data.reactive_contexts.borrow().is_empty() {
314            *query_data.clean_task.borrow_mut() = Some(spawn_forever(async move {
315                // Wait as long as the stale time is configured
316                Timer::after(query.clean_time).await;
317
318                // Finally clear the query
319                let mut storage = storage.storage.write_unchecked();
320                storage.remove(&query);
321            }));
322        }
323
324        QueryReader {
325            state: query_data.state,
326        }
327    }
328
329    pub async fn invalidate_all() {
330        let storage = consume_context::<QueriesStorage<Q>>();
331
332        // Get all the queries
333        let matching_queries = storage
334            .storage
335            .read()
336            .clone()
337            .into_iter()
338            .collect::<Vec<_>>();
339        let matching_queries = matching_queries
340            .iter()
341            .map(|(q, d)| (q, d))
342            .collect::<Vec<_>>();
343
344        // Invalidate the queries
345        Self::run_queries(&matching_queries).await
346    }
347
348    pub async fn invalidate_matching(matching_keys: Q::Keys) {
349        let storage = consume_context::<QueriesStorage<Q>>();
350
351        // Get those queries that match
352        let mut matching_queries = Vec::new();
353        for (query, data) in storage.storage.read().iter() {
354            if query.query.matches(&matching_keys) {
355                matching_queries.push((query.clone(), data.clone()));
356            }
357        }
358        let matching_queries = matching_queries
359            .iter()
360            .map(|(q, d)| (q, d))
361            .collect::<Vec<_>>();
362
363        // Invalidate the queries
364        Self::run_queries(&matching_queries).await
365    }
366
367    async fn run_queries(queries: &[(&Query<Q>, &QueryData<Q>)]) {
368        let tasks = FuturesUnordered::new();
369
370        for (query, query_data) in queries {
371            // Set to Loading
372            let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending)
373                .into_loading();
374            *query_data.state.borrow_mut() = res;
375            for reactive_context in query_data.reactive_contexts.borrow().iter() {
376                reactive_context.notify();
377            }
378
379            tasks.push(Box::pin(async move {
380                // Run
381                let res = query.query.run(&query.keys).await;
382
383                // Set to settled
384                *query_data.state.borrow_mut() = QueryStateData::Settled {
385                    res,
386                    settlement_instant: Instant::now(),
387                };
388                for reactive_context in query_data.reactive_contexts.borrow().iter() {
389                    reactive_context.notify();
390                }
391            }));
392        }
393
394        tasks.count().await;
395    }
396}
397
398pub struct GetQuery<Q: QueryCapability> {
399    query: Q,
400    keys: Q::Keys,
401
402    stale_time: Duration,
403    clean_time: Duration,
404}
405
406impl<Q: QueryCapability> GetQuery<Q> {
407    pub fn new(keys: Q::Keys, query: Q) -> Self {
408        Self {
409            query,
410            keys,
411            stale_time: Duration::ZERO,
412            clean_time: Duration::ZERO,
413        }
414    }
415    /// For how long is the data considered stale. If a query subscriber is mounted and the data is stale, it will re run the query.
416    ///
417    /// Defaults to [Duration::ZERO], meaning it is marked stale immediately.
418    pub fn stale_time(self, stale_time: Duration) -> Self {
419        Self { stale_time, ..self }
420    }
421
422    /// For how long the data is kept cached after there are no more query subscribers.
423    ///
424    /// Defaults to [Duration::ZERO], meaning it clears automatically.
425    pub fn clean_time(self, clean_time: Duration) -> Self {
426        Self { clean_time, ..self }
427    }
428}
429
430impl<Q: QueryCapability> From<GetQuery<Q>> for Query<Q> {
431    fn from(value: GetQuery<Q>) -> Self {
432        Query {
433            query: value.query,
434            keys: value.keys,
435
436            enabled: true,
437
438            stale_time: value.stale_time,
439            clean_time: value.clean_time,
440            interval_time: Duration::MAX,
441        }
442    }
443}
444#[derive(PartialEq, Clone)]
445pub struct Query<Q: QueryCapability> {
446    query: Q,
447    keys: Q::Keys,
448
449    enabled: bool,
450
451    stale_time: Duration,
452    clean_time: Duration,
453    interval_time: Duration,
454}
455
456impl<Q: QueryCapability> Eq for Query<Q> {}
457impl<Q: QueryCapability> Hash for Query<Q> {
458    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
459        self.query.hash(state);
460        self.keys.hash(state);
461
462        self.enabled.hash(state);
463
464        self.stale_time.hash(state);
465        self.clean_time.hash(state);
466
467        // Intentionally left out as intervals can vary from one query subscriber to another
468        // self.interval_time.hash(state);
469    }
470}
471
472impl<Q: QueryCapability> Query<Q> {
473    pub fn new(keys: Q::Keys, query: Q) -> Self {
474        Self {
475            query,
476            keys,
477            enabled: true,
478            stale_time: Duration::ZERO,
479            clean_time: Duration::from_secs(5 * 60),
480            interval_time: Duration::MAX,
481        }
482    }
483
484    /// Enable or disable this query so that it doesnt automatically run.
485    ///
486    /// Defaults to `true`.
487    pub fn enable(self, enabled: bool) -> Self {
488        Self { enabled, ..self }
489    }
490
491    /// For how long is the data considered stale. If a query subscriber is mounted and the data is stale, it will re run the query
492    /// otherwise it return the cached data.
493    ///
494    /// Defaults to [Duration::ZERO], meaning it is marked stale immediately after it has been used.
495    pub fn stale_time(self, stale_time: Duration) -> Self {
496        Self { stale_time, ..self }
497    }
498
499    /// For how long the data is kept cached after there are no more query subscribers.
500    ///
501    /// Defaults to `5min`, meaning it clears automatically after 5 minutes of no subscribers to it.
502    pub fn clean_time(self, clean_time: Duration) -> Self {
503        Self { clean_time, ..self }
504    }
505
506    /// Every how often the query reruns.
507    ///
508    /// Defaults to [Duration::MAX], meaning it never re runs automatically.
509    ///
510    /// **Note**: If multiple subscribers of the same query use different intervals, only the shortest one will be used.
511    pub fn interval_time(self, interval_time: Duration) -> Self {
512        Self {
513            interval_time,
514            ..self
515        }
516    }
517}
518
519pub struct QueryReader<Q: QueryCapability> {
520    state: Rc<RefCell<QueryStateData<Q>>>,
521}
522
523impl<Q: QueryCapability> QueryReader<Q> {
524    pub fn state(&'_ self) -> Ref<'_, QueryStateData<Q>> {
525        self.state.borrow()
526    }
527
528    /// Get the result of the query.
529    ///
530    /// **This method will panic if the query is not settled.**
531    pub fn as_settled(&'_ self) -> Ref<'_, Result<Q::Ok, Q::Err>> {
532        Ref::map(self.state.borrow(), |state| match state {
533            QueryStateData::Settled { res, .. } => res,
534            _ => panic!("Query is not settled."),
535        })
536    }
537}
538
539pub struct UseQuery<Q: QueryCapability> {
540    query: State<Query<Q>>,
541}
542
543impl<Q: QueryCapability> Clone for UseQuery<Q> {
544    fn clone(&self) -> Self {
545        *self
546    }
547}
548
549impl<Q: QueryCapability> Copy for UseQuery<Q> {}
550
551impl<Q: QueryCapability> UseQuery<Q> {
552    /// Read the [Query] state.
553    ///
554    /// This **will** automatically subscribe.
555    /// If you want a **non-subscribing** method have a look at [UseQuery::peek].
556    pub fn read(&self) -> QueryReader<Q> {
557        let storage = consume_context::<QueriesStorage<Q>>();
558        let map = storage.storage.peek();
559        let query_data = map.get(&self.query.peek()).cloned().unwrap();
560
561        // Subscribe if possible
562        if let Some(mut reactive_context) = ReactiveContext::try_current() {
563            reactive_context.subscribe(&query_data.reactive_contexts);
564        }
565
566        QueryReader {
567            state: query_data.state,
568        }
569    }
570
571    /// Read the [Query] state.
572    ///
573    /// This **will not** automatically subscribe.
574    /// If you want a **subscribing** method have a look at [UseQuery::read].
575    pub fn peek(&self) -> QueryReader<Q> {
576        let storage = consume_context::<QueriesStorage<Q>>();
577        let map = storage.storage.peek();
578        let query_data = map.get(&self.query.peek()).cloned().unwrap();
579
580        QueryReader {
581            state: query_data.state,
582        }
583    }
584
585    /// Invalidate this query and await its result.
586    ///
587    /// For a `sync` version use [UseQuery::invalidate].
588    pub async fn invalidate_async(&self) -> QueryReader<Q> {
589        let storage = consume_context::<QueriesStorage<Q>>();
590
591        let query = self.query.peek().clone();
592        let map = storage.storage.peek();
593        let query_data = map.get(&query).cloned().unwrap();
594
595        // Run the query
596        QueriesStorage::run_queries(&[(&query, &query_data)]).await;
597
598        QueryReader {
599            state: query_data.state.clone(),
600        }
601    }
602
603    /// Invalidate this query in the background.
604    ///
605    /// For an `async` version use [UseQuery::invalidate_async].
606    pub fn invalidate(&self) {
607        let storage = consume_context::<QueriesStorage<Q>>();
608
609        let query = self.query.peek().clone();
610        let map = storage.storage.peek();
611        let query_data = map.get(&query).cloned().unwrap();
612
613        // Run the query
614        spawn(async move { QueriesStorage::run_queries(&[(&query, &query_data)]).await });
615    }
616}
617
618/// Queries are used to get data asynchronously (e.g external resources such as HTTP APIs), which can later be cached or refreshed.
619///
620/// Important concepts:
621///
622/// ### Stale time
623/// This is how long will a value that is cached, considered to be recent enough.
624/// So in other words, if a value is stale it means that its outdated and therefore it should be refreshed.
625///
626/// By default the stale time is `0ms`, so if a value is cached and a new query subscriber
627/// is interested in this value, it will get refreshed automatically.
628///
629/// See [Query::stale_time].
630///
631/// ### Clean time
632/// This is how long will a value kept cached after there are no more subscribers of that query.
633///
634/// Imagine there is `Subscriber 1` of a query, the data is requested and cached.
635/// But after some seconds the `Subscriber 1` is unmounted, but the data is not cleared as the default clean time is `5min`.
636/// A few seconds later the `Subscriber 1` gets mounted again, it requests the data again but this time it is returned directly from the cache.
637///
638/// See [Query::clean_time].
639///
640/// ### Interval time
641/// This is how often do you want a query to be refreshed in the background automatically.
642/// By default it never refreshes automatically.
643///
644/// See [Query::interval_time].
645pub fn use_query<Q: QueryCapability>(query: Query<Q>) -> UseQuery<Q> {
646    let mut storage = match try_consume_context::<QueriesStorage<Q>>() {
647        Some(storage) => storage,
648        None => {
649            provide_context_for_scope_id(QueriesStorage::<Q>::new_in_root(), Some(ScopeId::ROOT));
650            try_consume_context::<QueriesStorage<Q>>().unwrap()
651        }
652    };
653
654    let mut make_query = |query: &Query<Q>, mut prev_query: Option<Query<Q>>| {
655        let query_data = storage.insert_or_get_query(query.clone());
656
657        // Update the query tasks if there has been a change in the query
658        if let Some(prev_query) = prev_query.take() {
659            storage.update_tasks(prev_query);
660        }
661
662        // Immediately run the query if enabled and the value is stale
663        if query.enabled && query_data.state.borrow().is_stale(query) {
664            let query = query.clone();
665            spawn(async move {
666                QueriesStorage::run_queries(&[(&query, &query_data)]).await;
667            });
668        }
669    };
670
671    let mut current_query = use_hook(|| {
672        make_query(&query, None);
673        State::create(query.clone())
674    });
675
676    if *current_query.read() != query {
677        let prev = mem::replace(&mut *current_query.write(), query.clone());
678        make_query(&query, Some(prev));
679    }
680
681    // Update the query tasks when the scope is dropped
682    use_drop({
683        move || {
684            storage.update_tasks(current_query.peek().clone());
685        }
686    });
687
688    UseQuery {
689        query: current_query,
690    }
691}