1use core::fmt;
2use std::{
3 cell::{
4 Ref,
5 RefCell,
6 },
7 collections::HashMap,
8 future::Future,
9 hash::Hash,
10 mem,
11 rc::Rc,
12 time::{
13 Duration,
14 Instant,
15 },
16};
17
18use async_io::Timer;
19use freya_core::{
20 integration::FxHashSet,
21 lifecycle::context::{
22 consume_context,
23 provide_context_for_scope_id,
24 try_consume_context,
25 },
26 prelude::*,
27 scope_id::ScopeId,
28};
29use futures_util::stream::{
30 FuturesUnordered,
31 StreamExt,
32};
33
34pub trait QueryCapability
35where
36 Self: 'static + Clone + PartialEq + Hash + Eq,
37{
38 type Ok;
39 type Err;
40 type Keys: Hash + PartialEq + Clone;
41
42 fn run(&self, keys: &Self::Keys) -> impl Future<Output = Result<Self::Ok, Self::Err>>;
44
45 fn matches(&self, _keys: &Self::Keys) -> bool {
47 true
48 }
49}
50
51pub enum QueryStateData<Q: QueryCapability> {
52 Pending,
54 Loading { res: Option<Result<Q::Ok, Q::Err>> },
56 Settled {
58 res: Result<Q::Ok, Q::Err>,
59 settlement_instant: Instant,
60 },
61}
62
63impl<Q: QueryCapability> TryFrom<QueryStateData<Q>> for Result<Q::Ok, Q::Err> {
64 type Error = ();
65
66 fn try_from(value: QueryStateData<Q>) -> Result<Self, Self::Error> {
67 match value {
68 QueryStateData::Loading { res: Some(res) } => Ok(res),
69 QueryStateData::Settled { res, .. } => Ok(res),
70 _ => Err(()),
71 }
72 }
73}
74
75impl<Q> fmt::Debug for QueryStateData<Q>
76where
77 Q: QueryCapability,
78 Q::Ok: fmt::Debug,
79 Q::Err: fmt::Debug,
80{
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 match self {
83 Self::Pending => f.write_str("Pending"),
84 Self::Loading { res } => write!(f, "Loading {{ {res:?} }}"),
85 Self::Settled { res, .. } => write!(f, "Settled {{ {res:?} }}"),
86 }
87 }
88}
89
90impl<Q: QueryCapability> QueryStateData<Q> {
91 pub fn is_ok(&self) -> bool {
93 matches!(self, QueryStateData::Settled { res: Ok(_), .. })
94 }
95
96 pub fn is_err(&self) -> bool {
98 matches!(self, QueryStateData::Settled { res: Err(_), .. })
99 }
100
101 pub fn is_loading(&self) -> bool {
103 matches!(self, QueryStateData::Loading { .. })
104 }
105
106 pub fn is_pending(&self) -> bool {
108 matches!(self, QueryStateData::Pending)
109 }
110
111 pub fn is_stale(&self, query: &Query<Q>) -> bool {
113 match self {
114 QueryStateData::Pending => true,
115 QueryStateData::Loading { .. } => true,
116 QueryStateData::Settled {
117 settlement_instant, ..
118 } => Instant::now().duration_since(*settlement_instant) >= query.stale_time,
119 }
120 }
121
122 pub fn ok(&self) -> Option<&Q::Ok> {
124 match self {
125 Self::Settled { res: Ok(res), .. } => Some(res),
126 Self::Loading { res: Some(Ok(res)) } => Some(res),
127 _ => None,
128 }
129 }
130
131 pub fn unwrap(&self) -> &Result<Q::Ok, Q::Err> {
133 match self {
134 Self::Loading { res: Some(v) } => v,
135 Self::Settled { res, .. } => res,
136 _ => unreachable!(),
137 }
138 }
139
140 fn into_loading(self) -> QueryStateData<Q> {
141 match self {
142 QueryStateData::Pending => QueryStateData::Loading { res: None },
143 QueryStateData::Loading { res } => QueryStateData::Loading { res },
144 QueryStateData::Settled { res, .. } => QueryStateData::Loading { res: Some(res) },
145 }
146 }
147}
148
149pub struct QueriesStorage<Q: QueryCapability> {
150 storage: State<HashMap<Query<Q>, QueryData<Q>>>,
151}
152
153impl<Q: QueryCapability> Copy for QueriesStorage<Q> {}
154
155impl<Q: QueryCapability> Clone for QueriesStorage<Q> {
156 fn clone(&self) -> Self {
157 *self
158 }
159}
160
161pub struct QueryData<Q: QueryCapability> {
162 state: Rc<RefCell<QueryStateData<Q>>>,
163 reactive_contexts: Rc<RefCell<FxHashSet<ReactiveContext>>>,
164
165 interval_task: Rc<RefCell<Option<(Duration, TaskHandle)>>>,
166 clean_task: Rc<RefCell<Option<TaskHandle>>>,
167}
168
169impl<Q: QueryCapability> Clone for QueryData<Q> {
170 fn clone(&self) -> Self {
171 Self {
172 state: self.state.clone(),
173 reactive_contexts: self.reactive_contexts.clone(),
174
175 interval_task: self.interval_task.clone(),
176 clean_task: self.clean_task.clone(),
177 }
178 }
179}
180
181impl<Q: QueryCapability> QueriesStorage<Q> {
182 fn new_in_root() -> Self {
183 Self {
184 storage: State::create_global(HashMap::default()),
185 }
186 }
187
188 fn insert_or_get_query(&mut self, query: Query<Q>) -> QueryData<Q> {
189 let query_clone = query.clone();
190 let mut storage = self.storage.write_unchecked();
191
192 let query_data = storage.entry(query).or_insert_with(|| QueryData {
193 state: Rc::new(RefCell::new(QueryStateData::Pending)),
194 reactive_contexts: Rc::new(RefCell::new(FxHashSet::default())),
195 interval_task: Rc::default(),
196 clean_task: Rc::default(),
197 });
198 let query_data_clone = query_data.clone();
199
200 if let Some(clean_task) = query_data.clean_task.take() {
202 clean_task.cancel();
203 }
204
205 let interval = query_clone.interval_time;
209 let interval_enabled = query_clone.interval_time != Duration::MAX;
210 let interval_task = &mut *query_data.interval_task.borrow_mut();
211
212 let create_interval_task = match interval_task {
213 None if interval_enabled => true,
214 Some((current_interval, current_interval_task)) if interval_enabled => {
215 let new_interval_is_shorter = *current_interval > interval;
216 if new_interval_is_shorter {
217 current_interval_task.cancel();
218 *interval_task = None;
219 }
220 new_interval_is_shorter
221 }
222 _ => false,
223 };
224 if create_interval_task {
225 let task = spawn_forever(async move {
226 loop {
227 Timer::after(interval).await;
229
230 QueriesStorage::<Q>::run_queries(&[(&query_clone, &query_data_clone)]).await;
232 }
233 });
234 *interval_task = Some((interval, task));
235 }
236
237 query_data.clone()
238 }
239
240 fn update_tasks(&mut self, query: Query<Q>) {
241 let storage_clone = self.storage;
242 let mut storage = self.storage.write_unchecked();
243
244 let query_data = storage.get_mut(&query).unwrap();
245
246 if let Some((_, interval_task)) = query_data.interval_task.take() {
248 interval_task.cancel();
249 }
250
251 if query_data.reactive_contexts.borrow().is_empty() {
253 *query_data.clean_task.borrow_mut() = Some(spawn_forever(async move {
254 Timer::after(query.clean_time).await;
256
257 let mut storage = storage_clone.write_unchecked();
259 storage.remove(&query);
260 }));
261 }
262 }
263
264 pub async fn get(get_query: GetQuery<Q>) -> QueryReader<Q> {
265 let query: Query<Q> = get_query.into();
266
267 let mut storage = match try_consume_context::<QueriesStorage<Q>>() {
268 Some(storage) => storage,
269 None => {
270 provide_context_for_scope_id(
271 QueriesStorage::<Q>::new_in_root(),
272 Some(ScopeId::ROOT),
273 );
274 try_consume_context::<QueriesStorage<Q>>().unwrap()
275 }
276 };
277
278 let mut map = storage.storage.write();
279 let query_data = map
280 .entry(query.clone())
281 .or_insert_with(|| QueryData {
282 state: Rc::new(RefCell::new(QueryStateData::Pending)),
283 reactive_contexts: Rc::new(RefCell::new(FxHashSet::default())),
284 interval_task: Rc::default(),
285 clean_task: Rc::default(),
286 })
287 .clone();
288
289 if query_data.state.borrow().is_stale(&query) {
291 let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending)
293 .into_loading();
294 *query_data.state.borrow_mut() = res;
295 for reactive_context in query_data.reactive_contexts.borrow().iter() {
296 reactive_context.notify();
297 }
298
299 let res = query.query.run(&query.keys).await;
301
302 *query_data.state.borrow_mut() = QueryStateData::Settled {
304 res,
305 settlement_instant: Instant::now(),
306 };
307 for reactive_context in query_data.reactive_contexts.borrow().iter() {
308 reactive_context.notify();
309 }
310 }
311
312 if query_data.reactive_contexts.borrow().is_empty() {
314 *query_data.clean_task.borrow_mut() = Some(spawn_forever(async move {
315 Timer::after(query.clean_time).await;
317
318 let mut storage = storage.storage.write_unchecked();
320 storage.remove(&query);
321 }));
322 }
323
324 QueryReader {
325 state: query_data.state,
326 }
327 }
328
329 pub async fn invalidate_all() {
330 let storage = consume_context::<QueriesStorage<Q>>();
331
332 let matching_queries = storage
334 .storage
335 .read()
336 .clone()
337 .into_iter()
338 .collect::<Vec<_>>();
339 let matching_queries = matching_queries
340 .iter()
341 .map(|(q, d)| (q, d))
342 .collect::<Vec<_>>();
343
344 Self::run_queries(&matching_queries).await
346 }
347
348 pub async fn invalidate_matching(matching_keys: Q::Keys) {
349 let storage = consume_context::<QueriesStorage<Q>>();
350
351 let mut matching_queries = Vec::new();
353 for (query, data) in storage.storage.read().iter() {
354 if query.query.matches(&matching_keys) {
355 matching_queries.push((query.clone(), data.clone()));
356 }
357 }
358 let matching_queries = matching_queries
359 .iter()
360 .map(|(q, d)| (q, d))
361 .collect::<Vec<_>>();
362
363 Self::run_queries(&matching_queries).await
365 }
366
367 async fn run_queries(queries: &[(&Query<Q>, &QueryData<Q>)]) {
368 let tasks = FuturesUnordered::new();
369
370 for (query, query_data) in queries {
371 let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending)
373 .into_loading();
374 *query_data.state.borrow_mut() = res;
375 for reactive_context in query_data.reactive_contexts.borrow().iter() {
376 reactive_context.notify();
377 }
378
379 tasks.push(Box::pin(async move {
380 let res = query.query.run(&query.keys).await;
382
383 *query_data.state.borrow_mut() = QueryStateData::Settled {
385 res,
386 settlement_instant: Instant::now(),
387 };
388 for reactive_context in query_data.reactive_contexts.borrow().iter() {
389 reactive_context.notify();
390 }
391 }));
392 }
393
394 tasks.count().await;
395 }
396}
397
398pub struct GetQuery<Q: QueryCapability> {
399 query: Q,
400 keys: Q::Keys,
401
402 stale_time: Duration,
403 clean_time: Duration,
404}
405
406impl<Q: QueryCapability> GetQuery<Q> {
407 pub fn new(keys: Q::Keys, query: Q) -> Self {
408 Self {
409 query,
410 keys,
411 stale_time: Duration::ZERO,
412 clean_time: Duration::ZERO,
413 }
414 }
415 pub fn stale_time(self, stale_time: Duration) -> Self {
419 Self { stale_time, ..self }
420 }
421
422 pub fn clean_time(self, clean_time: Duration) -> Self {
426 Self { clean_time, ..self }
427 }
428}
429
430impl<Q: QueryCapability> From<GetQuery<Q>> for Query<Q> {
431 fn from(value: GetQuery<Q>) -> Self {
432 Query {
433 query: value.query,
434 keys: value.keys,
435
436 enabled: true,
437
438 stale_time: value.stale_time,
439 clean_time: value.clean_time,
440 interval_time: Duration::MAX,
441 }
442 }
443}
444#[derive(PartialEq, Clone)]
445pub struct Query<Q: QueryCapability> {
446 query: Q,
447 keys: Q::Keys,
448
449 enabled: bool,
450
451 stale_time: Duration,
452 clean_time: Duration,
453 interval_time: Duration,
454}
455
456impl<Q: QueryCapability> Eq for Query<Q> {}
457impl<Q: QueryCapability> Hash for Query<Q> {
458 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
459 self.query.hash(state);
460 self.keys.hash(state);
461
462 self.enabled.hash(state);
463
464 self.stale_time.hash(state);
465 self.clean_time.hash(state);
466
467 }
470}
471
472impl<Q: QueryCapability> Query<Q> {
473 pub fn new(keys: Q::Keys, query: Q) -> Self {
474 Self {
475 query,
476 keys,
477 enabled: true,
478 stale_time: Duration::ZERO,
479 clean_time: Duration::from_secs(5 * 60),
480 interval_time: Duration::MAX,
481 }
482 }
483
484 pub fn enable(self, enabled: bool) -> Self {
488 Self { enabled, ..self }
489 }
490
491 pub fn stale_time(self, stale_time: Duration) -> Self {
496 Self { stale_time, ..self }
497 }
498
499 pub fn clean_time(self, clean_time: Duration) -> Self {
503 Self { clean_time, ..self }
504 }
505
506 pub fn interval_time(self, interval_time: Duration) -> Self {
512 Self {
513 interval_time,
514 ..self
515 }
516 }
517}
518
519pub struct QueryReader<Q: QueryCapability> {
520 state: Rc<RefCell<QueryStateData<Q>>>,
521}
522
523impl<Q: QueryCapability> QueryReader<Q> {
524 pub fn state(&'_ self) -> Ref<'_, QueryStateData<Q>> {
525 self.state.borrow()
526 }
527
528 pub fn as_settled(&'_ self) -> Ref<'_, Result<Q::Ok, Q::Err>> {
532 Ref::map(self.state.borrow(), |state| match state {
533 QueryStateData::Settled { res, .. } => res,
534 _ => panic!("Query is not settled."),
535 })
536 }
537}
538
539pub struct UseQuery<Q: QueryCapability> {
540 query: State<Query<Q>>,
541}
542
543impl<Q: QueryCapability> Clone for UseQuery<Q> {
544 fn clone(&self) -> Self {
545 *self
546 }
547}
548
549impl<Q: QueryCapability> Copy for UseQuery<Q> {}
550
551impl<Q: QueryCapability> UseQuery<Q> {
552 pub fn read(&self) -> QueryReader<Q> {
557 let storage = consume_context::<QueriesStorage<Q>>();
558 let map = storage.storage.peek();
559 let query_data = map.get(&self.query.peek()).cloned().unwrap();
560
561 if let Some(mut reactive_context) = ReactiveContext::try_current() {
563 reactive_context.subscribe(&query_data.reactive_contexts);
564 }
565
566 QueryReader {
567 state: query_data.state,
568 }
569 }
570
571 pub fn peek(&self) -> QueryReader<Q> {
576 let storage = consume_context::<QueriesStorage<Q>>();
577 let map = storage.storage.peek();
578 let query_data = map.get(&self.query.peek()).cloned().unwrap();
579
580 QueryReader {
581 state: query_data.state,
582 }
583 }
584
585 pub async fn invalidate_async(&self) -> QueryReader<Q> {
589 let storage = consume_context::<QueriesStorage<Q>>();
590
591 let query = self.query.peek().clone();
592 let map = storage.storage.peek();
593 let query_data = map.get(&query).cloned().unwrap();
594
595 QueriesStorage::run_queries(&[(&query, &query_data)]).await;
597
598 QueryReader {
599 state: query_data.state.clone(),
600 }
601 }
602
603 pub fn invalidate(&self) {
607 let storage = consume_context::<QueriesStorage<Q>>();
608
609 let query = self.query.peek().clone();
610 let map = storage.storage.peek();
611 let query_data = map.get(&query).cloned().unwrap();
612
613 spawn(async move { QueriesStorage::run_queries(&[(&query, &query_data)]).await });
615 }
616}
617
618pub fn use_query<Q: QueryCapability>(query: Query<Q>) -> UseQuery<Q> {
646 let mut storage = match try_consume_context::<QueriesStorage<Q>>() {
647 Some(storage) => storage,
648 None => {
649 provide_context_for_scope_id(QueriesStorage::<Q>::new_in_root(), Some(ScopeId::ROOT));
650 try_consume_context::<QueriesStorage<Q>>().unwrap()
651 }
652 };
653
654 let mut make_query = |query: &Query<Q>, mut prev_query: Option<Query<Q>>| {
655 let query_data = storage.insert_or_get_query(query.clone());
656
657 if let Some(prev_query) = prev_query.take() {
659 storage.update_tasks(prev_query);
660 }
661
662 if query.enabled && query_data.state.borrow().is_stale(query) {
664 let query = query.clone();
665 spawn(async move {
666 QueriesStorage::run_queries(&[(&query, &query_data)]).await;
667 });
668 }
669 };
670
671 let mut current_query = use_hook(|| {
672 make_query(&query, None);
673 State::create(query.clone())
674 });
675
676 if *current_query.read() != query {
677 let prev = mem::replace(&mut *current_query.write(), query.clone());
678 make_query(&query, Some(prev));
679 }
680
681 use_drop({
683 move || {
684 storage.update_tasks(current_query.peek().clone());
685 }
686 });
687
688 UseQuery {
689 query: current_query,
690 }
691}